import datetime
import errno
import itertools
import logging
import os
import pickle
import socket
import threading
import time
import traceback
import warnings
import zlib
from builtins import input
from io import open
import typing
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.wsgi import WSGIContainer
from boofuzz import (
blocks,
constants,
event_hook,
exception,
helpers,
pgraph,
primitives,
Request
)
from boofuzz.loggers import fuzz_logger, fuzz_logger_curses, fuzz_logger_text, fuzz_logger_postgres
from boofuzz.exception import BoofuzzFailure
from boofuzz.monitors import CallbackMonitor
from boofuzz.mutation_context import MutationContext
from boofuzz.protocol_session import ProtocolSession
from boofuzz.web.app import app
from boofuzz.primitives.static import Static
from .connection import Connection
from .session_info import SessionInfo
from .web_app import WebApp
from .target import Target
from boofuzz.connections import UDPSocketConnection
CallbackFunction: typing.TypeAlias = typing.Callable
def open_test_run(db_name, db_table_name, port=constants.DEFAULT_WEB_UI_PORT, address=constants.DEFAULT_WEB_UI_ADDRESS):
s = SessionInfo(db_name=db_name, db_table_name=db_table_name)
w = WebApp(session_info=s, web_port=port, web_address=address)
w.server_init()
def get_datetime():
return datetime.datetime.now(datetime.UTC).replace(microsecond=0).isoformat().removesuffix('+00:00')
[docs]
class Session(pgraph.Graph):
"""
Extends pgraph.graph and provides a container for architecting protocol dialogs.
Args:
session_filename (str): Filename to serialize persistent data to. Default None.
index_start (int); First test case of library round to run
index_end (int); Last test case index to run
sleep_time (float): Time in seconds to sleep in between tests. Default 0.
restart_interval (int): Restart the target after n test cases, disable by setting to 0 (default).
console_gui (bool): Use curses to generate a static console screen similar to the webinterface. Has not been
tested under Windows. Works only if fuzz_loggers and log_level_stdout are kept to None.
Default False.
crash_threshold_request (int): Maximum number of crashes allowed before a request is exhausted. Default 12.
crash_threshold_element (int): Maximum number of crashes allowed before an element is exhausted. Default 3.
restart_sleep_time (int): Time in seconds to sleep when target can't be restarted. Default 5.
restart_callbacks (list of method): The registered method will be called after a failed post_test_case_callback
Default None.
restart_threshold (int): Maximum number of retries on lost target connection. Default None (indefinitely).
restart_timeout (float): Time in seconds for that a connection attempt should be retried. Default None
(indefinitely).
pre_send_callbacks (list of method): The registered method will be called prior to each fuzz request.
Default None.
post_test_case_callbacks (list of method): The registered method will be called after each fuzz test case.
Default None.
post_start_target_callbacks (list of method): Method(s) will be called after the target is started or restarted,
say, by a process monitor.
web_port (int or None): Port for monitoring fuzzing campaign via a web browser. Set to None to disable the web
app. Default 26000.
keep_web_open (bool): Keep the webinterface open after session completion. Default True.
log_level_stdout (int): If fuzz_loggers is kept to None, a FuzzLoggerText to stdout will be created with this
log level. See FuzzLoggerText for explanation on log levels.
fuzz_loggers (list of ifuzz_logger.IFuzzLogger): For saving test data and results. Default Log to stdout with a
log level of 0.
fuzz_db_keep_only_n_pass_cases (int): Minimize disk usage by only saving passing test cases
if they are in the n test cases preceding a failure or error.
Set to 0 to save after every test case (high disk I/O!). Default 0.
receive_data_after_each_request (bool): If True, Session will attempt to receive a reply after transmitting
each non-fuzzed node. Default True.
check_data_received_each_request (bool): If True, Session will verify that some data has
been received after transmitting each non-fuzzed node, and if not,
register a failure. If False, this check will not be performed. Default
False. A reception attempt is still made unless
receive_data_after_each_request is False.
receive_data_after_fuzz (bool): If True, Session will attempt to receive a reply after transmitting
a fuzzed message. Default False.
ignore_connection_reset (bool): Log ECONNRESET errors ("Target connection reset") as "info" instead of
failures.
ignore_connection_aborted (bool): Log ECONNABORTED errors as "info" instead of failures.
ignore_connection_issues_when_sending_fuzz_data (bool): Ignore fuzz data transmission failures. Default True.
This is usually a helpful setting to enable, as targets may drop connections once a
message is clearly invalid.
ignore_connection_ssl_errors (bool): Log SSL related errors as "info" instead of failures. Default False.
reuse_target_connection (bool): If True, only use one target connection instead of reconnecting each test case.
Default False.
target (Target): Target for fuzz session. Target must be fully initialized. Default None.
db_filename (str): Not in use.
Filename to store sqlite db for test results and case information.
Defaults to ./boofuzz-results/{uniq_timestamp}.db
db_name (str): Name of database. Defaults to {uniq_timestamp}
db_table_name (str | None): Name of table in database.
campaign_folder (str): Folder to store campaign files. Default None.
web_address: Address where's Boofuzz logger exposed. Default 'localhost'
round_type (str): library - random_mutation - random_generation define how to mutate in the current round.
Default "library"
seed_index (int): Starting seed index that will be incremented for each round, Default 0.
seed_index is concatenated with round_type to create a unique seed for each round.
max_number_of_rounds (int): Maximum number of round. Useful in replay mode. Default to 0 (infinite.)
nominal_test_interval (int): Test the health of target with nominal data after n test cases.
nominal_data (list[Request | CallbackFunction] | None): List of nominal data. Call set_nominal_data().
nominal_recv_test (typing.Callable[['Session'], bool] | None): Test function after nominal data test.
seconds_to_wait_after_restart (int): Time in seconds to wait after a target restart. Default 3.
rto_alpha_value (float) : See the :meth:`Request.calculate_rto` method for more details.
rto_beta_value (float) : See the :meth:`Request.calculate_rto` method for more details.
max_depth (int): Maximum combinatorial depth used for fuzzing.
num_mutations will return None if this value is None or greater than 1, as the number of mutations is typically very large when using combinatorial fuzzing.
Set to 1 for "simple" fuzzing.
.. versionchanged:: 0.4.2
This class has been moved into the sessions subpackage. The full path is now boofuzz.sessions.session.Session.
"""
def __init__(
self,
session_filename=None,
index_start=1,
index_end=None,
sleep_time=0.0,
restart_interval=0,
web_port=constants.DEFAULT_WEB_UI_PORT,
keep_web_open=True,
console_gui=False,
crash_threshold_request=12,
crash_threshold_element=3,
restart_sleep_time=5,
restart_callbacks=None,
restart_threshold=None,
restart_timeout=None,
pre_send_callbacks=None,
post_test_case_callbacks=None,
post_start_target_callbacks=None,
rto_alpha_value:None|float=0.125,
rto_beta_value:None|float=0.25,
log_level_stdout=None,
fuzz_loggers=None,
fuzz_db_keep_only_n_pass_cases=0,
receive_data_after_each_request=True,
check_data_received_each_request=False,
receive_data_after_fuzz=True,
ignore_connection_reset=False,
ignore_connection_aborted=False,
ignore_connection_issues_when_sending_fuzz_data=True,
ignore_connection_ssl_errors=False,
reuse_target_connection=False,
target: Target = None,
target_to_use=0,
web_address=constants.DEFAULT_WEB_UI_ADDRESS,
db_filename=None,
db_name: str = None,
db_table_name: str | None = None,
campaign_folder=None,
seed_index: int = 0,
round_type: str = "library",
max_number_of_rounds: int = 0,
_total_random_mutation_rounds=0,
nominal_test_interval: int = 50,
nominal_data: list[Request | CallbackFunction] | None = None,
nominal_recv_test: typing.Callable[['Session'], bool] | None = None,
seconds_to_wait_after_restart: int = 3,
max_depth: int = 1,
):
self._ignore_connection_reset = ignore_connection_reset
self._ignore_connection_aborted = ignore_connection_aborted
self._ignore_connection_issues_when_sending_fuzz_data = ignore_connection_issues_when_sending_fuzz_data
self._reuse_target_connection = reuse_target_connection
self._ignore_connection_ssl_errors = ignore_connection_ssl_errors
super(Session, self).__init__()
self.session_filename = session_filename
self._index_start = max(index_start, 1)
self._index_end = index_end
self.sleep_time = sleep_time
self.seconds_to_wait_after_restart = seconds_to_wait_after_restart
self.restart_interval = restart_interval
self.web_port = web_port
self._keep_web_open = keep_web_open
self.console_gui = console_gui
self._crash_threshold_node = crash_threshold_request
self._crash_threshold_element = crash_threshold_element
self.restart_sleep_time = restart_sleep_time
self.restart_threshold = restart_threshold
self.restart_timeout = restart_timeout
self.web_address = web_address
self.rto_alpha_value = rto_alpha_value
self.rto_beta_value = rto_beta_value
if fuzz_loggers is None:
fuzz_loggers = []
if log_level_stdout is not None:
fuzz_loggers.append(fuzz_logger_text.FuzzLoggerText(log_level=log_level_stdout))
else:
if self.console_gui and os.name != "nt":
fuzz_loggers.append(
fuzz_logger_curses.FuzzLoggerCurses(web_port=self.web_port, web_address=self.web_address)
)
self._keep_web_open = False
else:
fuzz_loggers.append(fuzz_logger_text.FuzzLoggerText())
# self._run_id = datetime.datetime.utcnow().replace(microsecond=0).isoformat().replace(":", "-")
if db_filename is not None:
warnings.warn(
"db_filename is no longer of any use",
)
if db_name is not None:
self._db_name = db_name
else:
self._db_name = get_datetime()
self._db_table_name = db_table_name
self._db_logger = fuzz_logger_postgres.FuzzLoggerPostgres(
db_name=self._db_name, db_table_name=self._db_table_name, num_log_cases=fuzz_db_keep_only_n_pass_cases
)
self.campaign_folder = campaign_folder
self._crash_filename = "boofuzz-crash-bin-{0}".format(get_datetime())
self._fuzz_data_logger = fuzz_logger.FuzzLogger(fuzz_loggers=[self._db_logger] + fuzz_loggers)
self._check_data_received_each_request = check_data_received_each_request
self._receive_data_after_each_request = receive_data_after_each_request
self._receive_data_after_fuzz = receive_data_after_fuzz
self._skip_current_node_after_current_test_case = False
self._skip_current_element_after_current_test_case = False
self.start_time = time.time()
self.end_time = None
self.cumulative_pause_time = 0
if self.web_port is not None:
self.web_interface_thread = self.build_webapp_thread(port=self.web_port, address=self.web_address)
if pre_send_callbacks is None:
pre_send_methods = []
else:
pre_send_methods = pre_send_callbacks
if post_test_case_callbacks is None:
post_test_case_methods = []
else:
post_test_case_methods = post_test_case_callbacks
if post_start_target_callbacks is None:
post_start_target_methods = []
else:
post_start_target_methods = post_start_target_callbacks
if restart_callbacks is None:
restart_methods = []
else:
restart_methods = restart_callbacks
self._callback_monitor = CallbackMonitor(
on_pre_send=pre_send_methods,
on_post_send=post_test_case_methods,
on_restart_target=restart_methods,
on_post_start_target=post_start_target_methods,
)
self.total_num_mutations = 0 # total available protocol mutations (before combining multiple mutations)
self.total_mutant_index = 0 # index within all mutations iterated through, including skipped mutations
self.mutant_index = 0 # index within currently mutating element
self.num_cases_actually_fuzzed = 0
self.fuzz_node: Request | None = None # Request object currently being fuzzed
self.current_test_case_name = ""
self.targets: list[Target] = []
self.target_to_use = target_to_use
self.monitor_results = {} # map of test case indices to list of crash synopsis strings (failed cases only)
# map of test case indices to list of supplement captured data (all cases where data was captured)
self.monitor_data = {}
self.is_paused = False
self.crashing_primitives = {}
self.on_failure = event_hook.EventHook()
self.max_depth = max_depth
# import settings if they exist.
self.import_file()
# create a root node. we do this because we need to start fuzzing from a single point and the user may want
# to specify a number of initial requests.
self.root = pgraph.Node()
self.root.label = "__ROOT_NODE__"
self.root.name = self.root.label
self.last_recv = None
self.last_send = None
self.seed_index: int = max(seed_index, 0)
self.seed: str = "None"
self.max_number_of_rounds = max_number_of_rounds
self.total_num_round = 0
self.continue_case = True
# Nominal data input :
self._nominal_data: list[Request | CallbackFunction] = [] # Use add_nominal_data() to add request at this list
self.nominal_test_interval = nominal_test_interval
self.set_nominal_data(nominal_data)
if nominal_recv_test is not None:
self.nominal_recv_test: typing.Callable[[Session], bool] = nominal_recv_test
else:
def f(session: Session):
return True
self.nominal_recv_test: typing.Callable[[Session], bool] = f
# Seed for the current mutation. It is a concatenation of round_type and seed_index.
# Is set at "0" by default instead of None, but is not used in the library mutation type.
# This is just to avoid to sum with a None value in the fuzzable class.
self.round_type: str = round_type
# library / random_mutation / random_generation define how to mutate in the current round
self._total_random_mutation_rounds = _total_random_mutation_rounds
self.add_node(self.root)
if target is not None:
# Set logger
target.set_fuzz_data_logger(fuzz_data_logger=self._fuzz_data_logger)
# Define method to apply options to monitor, that is passed to the target as a monitor
def apply_options(monitor, *args, **kwargs):
monitor.set_options(crash_filename=self._crash_filename)
return
target.monitor_alive.append(apply_options)
try:
self.add_target(target)
except exception.BoofuzzRpcError as e:
self._fuzz_data_logger.log_error(str(e))
raise
[docs]
def set_nominal_data(self, nominal_data: list[Request | CallbackFunction] | None) -> None:
"""
This function is used to set the nominal to test the target application every time the nominal_test_interval is
reach.
The input is a list which contain Requests or Callback Functions. These elements will be used in order to test
the target function. It's like a tree in fuzzungus but with only one path.
All primitive who constitute the Requests have to be Static. Because it's nominal data, fuzzungus didn't have
to fuzz these data.
If the input is None it will remove any nominal data.
"""
if nominal_data is None:
nominal_data = []
self._nominal_data = []
for data in nominal_data:
if isinstance(data, Request):
for primitive in data.walk(): # Verify that the request only contain Static element
if not isinstance(primitive, Static):
raise Exception(
f'The nominal data request that you try to set must contain only Static primitive. '
f'{data.stack}'
)
self._nominal_data.append(data)
elif callable(data):
self._nominal_data.append(data)
else:
raise Exception(f'The nominal data that you try to set is invalid.'
f'It has to be a Request containing only Static primitive or a callback function.'
f'{data}')
[docs]
def nominal_test(self) -> None:
"""
This function is call each time the nominal_test_interval is reach.
It is responsible for :
* Open the target's connection
* Send the static requests or call the callback functions in order
* Detect a failure
* Close the target's connection
* Log everything
"""
if not self._nominal_data:
return
self._fuzz_data_logger.open_test_step(f"nominal test interval of {self.nominal_test_interval} reached")
self.target_to_use = 0
target = self.targets[0]
self._open_connection_keep_trying(target)
# pre
self._pre_send(target)
for data in self._nominal_data:
if isinstance(data, Request):
self._fuzz_data_logger.log_info(f'Transmit nominal data {data.name}')
old_receive_data_after_each_request = self._receive_data_after_each_request
self._receive_data_after_each_request = True
self.transmit_normal(sock=None,
node=data,
edge=None,
callback_data=b'',
mutation_context=None
)
self._receive_data_after_each_request = old_receive_data_after_each_request
else:
self._fuzz_data_logger.log_info(f'Callback nominal data {data}')
data(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self)
# post
failure_already_detected = False
if not self.nominal_recv_test(self):
failure_already_detected = True
self._fuzz_data_logger.log_fail(f'Fail during nominal test. {self.total_mutant_index=}')
self._check_for_passively_detected_failures(target, failure_already_detected=failure_already_detected)
if not self._reuse_target_connection:
target.close()
self._fuzz_data_logger.open_test_step("end of nominal test")
@property
def netmon_results(self):
raise NotImplementedError(
"netmon_results is now part of monitor_results and thus can't be accessed directly."
" Please update your code."
)
[docs]
def add_node(self, node):
"""
Add a pgraph node to the graph. We overload this routine to automatically generate and assign an ID whenever a
node is added.
Args:
node (pgraph.Node): Node to add to session graph
"""
node.number = len(self.nodes)
node.id = len(self.nodes)
if node.id not in self.nodes:
self.nodes[node.id] = node
return self
[docs]
def add_target(self, target: Target):
"""
Add a target to the session. Multiple targets can be added for parallel fuzzing.
Args:
target (Target): Target to add to session
"""
# pass specified target parameters to the PED-RPC server.
target.monitors_alive()
target.set_fuzz_data_logger(fuzz_data_logger=self._fuzz_data_logger)
if self._callback_monitor not in target.monitors:
target.monitors.append(self._callback_monitor)
# Give its parent target to the target's socket connection, for logging purposes.
target.get_connection().parent_target = target
# Give to target his parent session
target.parent_session = self
# add target to internal list.
self.targets.append(target)
[docs]
def connect(self, src, dst=None, callback=None):
if dst is None:
for request1 in src.requests:
self.connect_boofuzz(request1, callback=callback)
else:
dst.parent_session = self
for request1 in src.requests:
for request2 in dst.requests:
self.connect_boofuzz(request1, request2, callback=callback)
# Give to the requests access to the current session,
# so children of these requests could see this session parameters.
src.parent_session = self
[docs]
def connect_boofuzz(self, src, dst=None, callback=None):
"""
Create a connection between the two requests (nodes) and register an optional callback to process in between
transmissions of the source and destination request. The session class maintains a top level node that all
initial requests must be connected to. Example::
sess = sessions.session()
sess.connect(sess.root, s_get("HTTP"))
If given only a single parameter, sess.connect() will default to attaching the supplied node to the root node.
This is a convenient alias. The following line is identical to the second line from the above example::
sess.connect(s_get("HTTP"))
Leverage callback methods to handle situations such as challenge response systems.
A callback method must follow the message signature of :meth:`Session.example_test_case_callback`.
Remember to include \\*\\*kwargs for forward-compatibility.
Args:
src (str or Request (pgrah.Node)): Source request name or request node
dst (str or Request (pgrah.Node), optional): Destination request name or request node
callback (def, optional): Callback function to pass received data to between node xmits. Default None.
Returns:
pgraph.Edge: The edge between the src and dst.
"""
# if only a source was provided, then make it the destination and set the source to the root node.
if dst is None:
dst = src
src = self.root
# if source or destination is a name, resolve the actual node.
if isinstance(src, str):
src = self.find_node("name", src)
if isinstance(dst, str):
dst = self.find_node("name", dst)
# if source or destination is not in the graph, add it.
if src != self.root and self.find_node("name", src.name) is None:
self.add_node(src)
if self.find_node("name", dst.name) is None:
self.add_node(dst)
# create an edge between the two nodes and add it to the graph.
edge = Connection(src.id, dst.id, callback)
self.add_edge(edge)
return edge
@property
def parent_session(self):
"""Reference to the parent session of this request, so children can now access parameters of session."""
return self
@property
def exec_speed(self):
return self.total_mutant_index / self.runtime
@property
def runtime(self):
if self.end_time is not None:
t = self.end_time
else:
t = time.time()
return t - self.start_time - self.cumulative_pause_time
[docs]
def export_file(self):
"""
Dump various object values to disk.
:see: import_file()
"""
if not self.session_filename:
return
data = {
"session_filename": self.session_filename,
"index_start": self.total_mutant_index,
"sleep_time": self.sleep_time,
"restart_sleep_time": self.restart_sleep_time,
"restart_interval": self.restart_interval,
"web_port": self.web_port,
"web_address": self.web_address,
"crash_threshold": self._crash_threshold_node,
"total_num_mutations": self.total_num_mutations,
"total_mutant_index": self.total_mutant_index,
"monitor_results": self.monitor_results,
"is_paused": self.is_paused,
}
fh = open(self.session_filename, "wb+")
fh.write(zlib.compress(pickle.dumps(data, protocol=2)))
fh.close()
[docs]
def _start_target(self, target: Target):
started = False
for monitor in target.monitors:
if monitor.start_target(fuzz_data_logger=self._fuzz_data_logger):
started = True
break
if started:
for monitor in target.monitors:
monitor.post_start_target(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self)
[docs]
def import_file(self):
"""
Load various object values from disk.
:see: export_file()
"""
if self.session_filename is None:
return
try:
with open(self.session_filename, "rb") as f:
data = pickle.loads(zlib.decompress(f.read()))
except (IOError, zlib.error, pickle.UnpicklingError):
return
# update the skip variable to pick up fuzzing from last test case.
self._index_start = data["total_mutant_index"]
self.session_filename = data["session_filename"]
self.sleep_time = data["sleep_time"]
self.restart_sleep_time = data["restart_sleep_time"]
self.restart_interval = data["restart_interval"]
self.web_port = data["web_port"]
self.web_address = data["web_address"]
self._crash_threshold_node = data["crash_threshold"]
self.total_num_mutations = data["total_num_mutations"]
self.total_mutant_index = data["total_mutant_index"]
self.monitor_results = data["monitor_results"]
self.is_paused = data["is_paused"]
[docs]
def num_mutations(self):
"""
Number of total mutations in the graph. The logic of this routine is identical to that of fuzz(). See fuzz()
for inline comments. The member variable self.total_num_mutations is updated appropriately by this routine.
Returns:
int: Total number of mutations in this session.
"""
if self.max_depth is None or self.max_depth > 1:
self.total_num_mutations = 0
return self.total_num_mutations
return self._num_mutations_recursive()
[docs]
def _num_mutations_recursive(self, this_node=None, path=None):
"""Helper for num_mutations.
Args:
this_node (request (node)): Current node that is being fuzzed. Default None.
path (list): Nodes along the path to the current one being fuzzed. Default [].
Returns:
int: Total number of mutations in this session.
"""
if this_node is None:
this_node = self.root
self.total_num_mutations = 0
if path is None:
path = []
for edge in self.edges_from(this_node.id):
next_node = self.nodes[edge.dst]
self.total_num_mutations += next_node.get_num_mutations()
if edge.src != self.root.id:
path.append(edge)
self._num_mutations_recursive(next_node, path)
# finished with the last node on the path, pop it off the path stack.
if path:
path.pop()
return self.total_num_mutations
[docs]
def _pause_if_pause_flag_is_set(self):
"""
If that pause flag is raised, enter an endless loop until it is lowered.
"""
if self.is_paused:
pause_start = time.time()
while 1:
if self.is_paused:
time.sleep(1)
else:
break
self.cumulative_pause_time += time.time() - pause_start
[docs]
def _check_for_passively_detected_failures(self, target:Target, failure_already_detected=False):
"""Check for and log passively detected failures. Return True if any found.
Args:
target (Target): Target to be checked for failures.
failure_already_detected (bool): If a failure was already detected.
Returns:
bool: True if failures were found. False otherwise.
"""
has_crashed = False
if len(target.monitors) > 0:
self._fuzz_data_logger.open_test_step("Contact target monitors")
# So, we need to run through the array two times. First, we check
# if any of the monitors reported a failure and if so, we need to
# gather a crash synopsis from them. We don't know whether
# a monitor can provide a crash synopsis, but in any case, we'll
# check. In the second run, we try to get crash synopsis from the
# monitors that did not detect a crash as supplemental information.
finished_monitors = []
for monitor in target.monitors:
if not monitor.post_send(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self):
has_crashed = True
self._fuzz_data_logger.log_fail(
f"{str(monitor)} detected crash on test case #{self.total_mutant_index}: {monitor.get_crash_synopsis()}"
)
finished_monitors.append(monitor)
if not has_crashed and not failure_already_detected:
self._fuzz_data_logger.log_pass("No crash detected.")
else:
for monitor in set(target.monitors) - set(finished_monitors):
synopsis = monitor.get_crash_synopsis()
if len(synopsis) > 0:
self._fuzz_data_logger.log_fail(
"{0} provided additional information for crash on #{1}: {2}".format(
str(monitor), self.total_mutant_index, monitor.get_crash_synopsis()
)
)
return has_crashed
[docs]
def _get_monitor_data(self, target):
"""Query monitors for any data they may want to add to this test case.
Args:
target (Target): Monitor to query data from.
"""
for monitor in target.monitors:
data = monitor.retrieve_data()
if data is not None and len(data) > 0:
self._fuzz_data_logger.log_info(
"{0} captured {1} bytes of additional data for test case #{2}".format(
str(monitor), len(data), self.total_mutant_index
)
)
if self.total_mutant_index not in self.monitor_data:
self.monitor_data[self.total_mutant_index] = []
self.monitor_data[self.total_mutant_index] += [data]
[docs]
def _process_failures(self, target):
"""Process any failures in crash_synopses.
If crash_synopses contains any entries, perform these failure-related actions:
- log failure summary if needed
- save failures to self.monitor_results (for website)
- exhaust node if crash threshold is reached
- target restart
Should be called after each fuzz test case.
Args:
target (Target): Target to restart if failure occurred.
Returns:
bool: True if any failures were found; False otherwise.
"""
crash_synopses = self._fuzz_data_logger.failed_test_cases.get(self._fuzz_data_logger.most_recent_test_id, [])
if len(crash_synopses) > 0:
self._fuzz_data_logger.open_test_step("Failure summary")
# retrieve the primitive that caused the crash and increment it's individual crash count.
self.crashing_primitives[self.fuzz_node.mutant] = self.crashing_primitives.get(self.fuzz_node.mutant, 0) + 1
self.crashing_primitives[self.fuzz_node] = self.crashing_primitives.get(self.fuzz_node, 0) + 1
# print crash synopsis
if len(crash_synopses) > 1:
# Prepend a header if > 1 failure report, so that they are visible from the main web page
synopsis = "({0} reports) {1}".format(len(crash_synopses), "\n".join(crash_synopses))
else:
synopsis = "\n".join(crash_synopses)
self.monitor_results[self.total_mutant_index] = crash_synopses
self._fuzz_data_logger.log_info(synopsis)
# If there is a current primitive being mutated
# And the primitive that caused the crash has reached the maximum number of crashes allowed before a request is exhausted
# Skip it
if (
self.fuzz_node.mutant is not None
and self.crashing_primitives[self.fuzz_node] >= self._crash_threshold_node
):
skipped = max(0, self.fuzz_node.get_num_mutations() - self.mutant_index)
self._skip_current_node_after_current_test_case = True
self._fuzz_data_logger.open_test_step(
"Crash threshold reached for this request, exhausting {0} mutants.".format(skipped)
)
self.total_mutant_index += skipped
self.mutant_index += skipped
elif (
self.fuzz_node.mutant is not None
and self.crashing_primitives[self.fuzz_node.mutant] >= self._crash_threshold_element
):
if not isinstance(self.fuzz_node.mutant, primitives.Group) and not isinstance(
self.fuzz_node.mutant, blocks.Repeat
):
skipped = max(0, self.fuzz_node.mutant.get_num_mutations() - self.mutant_index)
self._skip_current_element_after_current_test_case = True
self._fuzz_data_logger.open_test_step(
"Crash threshold reached for this element, exhausting {0} mutants.".format(skipped)
)
self.total_mutant_index += skipped
self.mutant_index += skipped
self._restart_target(target)
return True
else:
return False
[docs]
def register_post_test_case_callback(self, method):
"""Register a post-test case method.
The registered method will be called after each fuzz test case.
Potential uses:
* Closing down a connection.
* Checking for expected responses.
The order of callback events is as follows::
pre_send() - req - callback ... req - callback - post-test-case-callback
Args:
method (function): A method with the same parameters as :func:`~Session.post_send`
"""
self._callback_monitor.on_post_send.append(method)
# noinspection PyUnusedLocal
[docs]
def example_test_case_callback(self, target, fuzz_data_logger, session, test_case_context, *args, **kwargs):
"""
Example call signature for methods given to :func:`~Session.connect` or
:func:`~Session.register_post_test_case_callback`
Args:
target (Target): Target with sock-like interface.
fuzz_data_logger (ifuzz_logger.IFuzzLogger): Allows logging of test checks and passes/failures.
Provided with a test case and test step already opened.
session (Session): Session object calling post_send.
Useful properties include last_send and last_recv.
test_case_context (ProtocolSession): Context for test case-scoped data.
:py:class:`ProtocolSession` :py:attr:`session_variables <ProtocolSession.session_variables>`
values are generally set within a callback and referenced in elements via default values of type
:py:class:`ProtocolSessionReference`.
args: Implementations should include \\*args and \\**kwargs for forward-compatibility.
kwargs: Implementations should include \\*args and \\**kwargs for forward-compatibility.
"""
# default to doing nothing.
self._fuzz_data_logger.log_info("No post_send callback registered.")
# noinspection PyMethodMayBeStatic
[docs]
def _pre_send(self, target: Target):
"""
Execute custom methods to run prior to each fuzz request. The order of events is as follows::
pre_send() - req - callback ... req - callback - post_send()
When fuzzing RPC for example, register this method to establish the RPC bind.
Args:
target (session.target): Target we are sending data to
"""
for monitor in target.monitors:
try:
self._fuzz_data_logger.open_test_step("Monitor {}.pre_send()".format(str(monitor)))
monitor.pre_send(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self)
except Exception as e:
...
self._fuzz_data_logger.log_error(
constants.ERR_CALLBACK_FUNC.format(func_name="{}.pre_send()".format(str(monitor)))
+ traceback.format_exc()
)
[docs]
def _restart_target(self, target: Target):
"""
Restart the fuzz target. If a VMControl is available revert the snapshot, if a process monitor is available
restart the target process. If custom restart methods are registered, execute them. Otherwise, do nothing.
Args:
target (session.target): Target we are restarting
Raises:
exception.BoofuzzRestartFailedError: if restart fails.
"""
# TODO: reuse_target_connection seems to be only handled when using
# a custom callback. wtf?
self._fuzz_data_logger.open_test_step("Restarting target")
restarted = False
if len(self.on_failure) > 0:
for f in self.on_failure:
self._fuzz_data_logger.open_test_step("Calling registered on_failure method")
f(logger=self._fuzz_data_logger)
restarted = True
# vm restarting is the preferred method so try that before monitors.
elif target.vmcontrol:
self._fuzz_data_logger.log_info("Restarting target virtual machine")
target.vmcontrol.restart_target()
restarted = True
# we always have at least one monitor; a Callback Monitor that handles all callbacks.
else:
for monitor in target.monitors:
self._fuzz_data_logger.log_info("Restarting target process using {}".format(monitor.__class__.__name__))
if monitor.restart_target(target=target, fuzz_data_logger=self._fuzz_data_logger, session=self):
# TODO: doesn't this belong in the process monitor?
self._fuzz_data_logger.log_info(
f"Giving the process {self.seconds_to_wait_after_restart} seconds to settle in")
time.sleep(self.seconds_to_wait_after_restart)
restarted = True
break
if restarted:
for monitor in target.monitors:
monitor.post_start_target(target=self.targets[self.target_to_use],
fuzz_data_logger=self._fuzz_data_logger, session=self)
else:
self._fuzz_data_logger.log_info(
"No reset handler available... sleeping for {} seconds".format(self.restart_sleep_time)
)
time.sleep(self.restart_sleep_time)
# pass specified target parameters to the PED-RPC server to re-establish connections.
target.monitors_alive()
[docs]
def server_init(self):
"""Called by fuzz() to initialize variables, web interface, etc."""
if self.web_port is not None:
if not self.web_interface_thread.is_alive():
# spawn the web interface.
self.web_interface_thread.start()
[docs]
def _callback_current_node(self, node, edge, test_case_context):
"""Execute callback preceding current node.
Args:
test_case_context (ProtocolSession): Context for test case-scoped data.
node (pgraph.node.node (Node), optional): Current Request/Node
edge (pgraph.edge.edge (pgraph.edge), optional): Edge along the current fuzz path from "node" to next node.
Returns:
bytes: Data rendered by current node if any; otherwise None.
"""
data = None
# if the edge has a callback, process it. the callback has the option to render the node, modify it and return.
if edge.callback:
self._fuzz_data_logger.open_test_step("Callback function '{0}'".format(edge.callback.__name__))
data = edge.callback(
self.targets[self.target_to_use],
self._fuzz_data_logger,
session=self,
node=node,
edge=edge,
test_case_context=test_case_context,
)
return data
[docs]
def fragmentation_check(self, sock, node: Request, edge, callback_data, mutation_context, transmit_type):
"""
Fragment data is needed, then call the original transmit_fuzz() method.
Args:
sock (Target, optional): Socket-like object on which to transmit node
node (pgraph.node.node (Node), optional): Request/Node to transmit
edge (pgraph.edge.edge (pgraph.edge), optional): Edge along the current fuzz path from "node" to next node.
callback_data (bytes): Data from previous callback.
mutation_context (MutationContext): Current mutation context.
"""
if node.fragmentation is None:
self.transmit_all(sock, node, edge, callback_data, mutation_context, transmit_type)
else:
for data in node.fragmentation(session=self, sock=sock, node=node, edge=edge, callback_data=callback_data,
mutation_context=mutation_context, length=node.fragmentation_length):
self.transmit_all(sock, node, edge, callback_data=data, mutation_context=mutation_context,
transmit_type=transmit_type)
[docs]
def transmit_all(self, sock, node: Request, edge, callback_data, mutation_context, transmit_type):
"""
Parent method for all transmission (normal and fuzzed) methods.
This method is used to call the appropriate transmit method based on the current fuzzing state.
This is done to allow to measure elapsed time between each transmission and to log it.
Args :
sock (Target, optional): Socket-like object on which to transmit node
node (pgraph.node.node (Node), optional): Request/Node to transmit
edge (pgraph.edge.edge (pgraph.edge), optional): Edge along the current fuzz path from "node" to next node.
callback_data (bytes): Data from previous callback.
mutation_context (MutationContext): active mutation context
transmit_type (str): Type of transmit. "normal" or "fuzz".
"""
# Get time before sending
starting_time = time.time()
# Check transmit type
if transmit_type == "normal":
self.transmit_normal(
sock,
node,
edge,
callback_data=callback_data,
mutation_context=mutation_context
)
elif transmit_type == "fuzz":
self.transmit_fuzz(
sock,
self.fuzz_node,
mutation_context.message_path[-1],
callback_data=callback_data,
mutation_context=mutation_context,
)
else:
self._fuzz_data_logger.log_error(f"Unknown transmit type: {transmit_type}")
# If the node has a timeout check, check if the elapsed time is greater than the RTO
if node.timeout_check:
# Get time after sending
ending_time = time.time()
# Get elapsed time
elapsed_time = ending_time - starting_time
# Log elapsed time
if node.rto < elapsed_time and node.timeout_check:
self._fuzz_data_logger.log_target_warn(f"RTO exceeded: {elapsed_time} > {node.rto}")
# Calculate new RTO
node.calculate_rto(elapsed_time, self.rto_alpha_value, self.rto_beta_value)
[docs]
def transmit_normal(self, sock, node: Request, edge, callback_data, mutation_context):
"""Render and transmit a non-fuzzed node, process callbacks accordingly.
Args:
sock (Target, optional): Socket-like object on which to transmit node
node (pgraph.node.node (Node), optional): Request/Node to transmit
edge (pgraph.edge.edge (pgraph.edge), optional): Edge along the current fuzz path from "node" to next node.
callback_data (bytes): Data from previous callback.
mutation_context (MutationContext): active mutation context
"""
if callback_data:
data = callback_data
else:
data = node.render(mutation_context=mutation_context)
try: # send
self.targets[self.target_to_use].send(data)
self.last_send = data
except exception.BoofuzzTargetConnectionReset:
# TODO: Switch _ignore_connection_reset for _ignore_transmission_error, or provide retry mechanism
if self._ignore_connection_reset:
self._fuzz_data_logger.log_info(constants.ERR_CONN_RESET)
else:
raise BoofuzzFailure(message=constants.ERR_CONN_RESET)
except exception.BoofuzzTargetConnectionAborted as e:
# TODO: Switch _ignore_connection_aborted for _ignore_transmission_error, or provide retry mechanism
msg = constants.ERR_CONN_ABORTED.format(socket_errno=e.socket_errno, socket_errmsg=e.socket_errmsg)
if self._ignore_connection_aborted:
self._fuzz_data_logger.log_info(msg)
else:
raise BoofuzzFailure(msg)
except exception.BoofuzzSSLError as e:
if self._ignore_connection_ssl_errors:
self._fuzz_data_logger.log_info(str(e))
else:
raise BoofuzzFailure(message=str(e))
try: # recv
# if session is configured to receive data after each request, and if the node exists and has a
if node and(self._receive_data_after_each_request or node.answer_must_contain or
node.answer_must_not_contain) and node.receive_data_after_transmit:
connection = self.targets[self.target_to_use].get_connection()
if isinstance(connection, UDPSocketConnection) and not connection.bind:
connection.reuse_my_port()
self.last_recv = self.targets[self.target_to_use].recv()
if self._check_data_received_each_request:
self._fuzz_data_logger.log_check("Verify some data was received from the target.")
if not self.last_recv:
# Assume a crash?
raise BoofuzzFailure(message="Nothing received from target.")
else:
self._fuzz_data_logger.log_pass("Some data received from target.")
except exception.BoofuzzTargetConnectionReset:
if self._check_data_received_each_request:
raise BoofuzzFailure(message=constants.ERR_CONN_RESET)
else:
self._fuzz_data_logger.log_info(constants.ERR_CONN_RESET)
except exception.BoofuzzTargetConnectionAborted as e:
msg = constants.ERR_CONN_ABORTED.format(socket_errno=e.socket_errno, socket_errmsg=e.socket_errmsg)
if self._check_data_received_each_request:
raise BoofuzzFailure(msg)
else:
self._fuzz_data_logger.log_info(msg)
except exception.BoofuzzSSLError as e:
if self._ignore_connection_ssl_errors:
self._fuzz_data_logger.log_info(str(e))
else:
raise BoofuzzFailure(str(e))
self.last_send = data
[docs]
def transmit_fuzz(self, sock, node: Request, edge, callback_data, mutation_context):
"""
Original transmit_fuzz() method of boofuzz, now encapsulated in a parent method that allows for fragmentation.
Render and transmit a fuzzed node, process callbacks accordingly.
Args:
sock (Target, optional): Socket-like object on which to transmit node
node (pgraph.node.node (Node), optional): Request/Node to transmit
edge (pgraph.edge.edge (pgraph.edge), optional): Edge along the current fuzz path from "node" to next node.
callback_data (bytes): Data from previous callback.
mutation_context (MutationContext): Current mutation context.
"""
if callback_data:
data = callback_data
else:
data = self.fuzz_node.render(mutation_context)
try: # send
self.targets[self.target_to_use].send(data)
except exception.BoofuzzTargetConnectionReset:
if self._ignore_connection_issues_when_sending_fuzz_data:
self._fuzz_data_logger.log_info(constants.ERR_CONN_RESET)
else:
raise BoofuzzFailure(message=constants.ERR_CONN_RESET)
except exception.BoofuzzTargetConnectionAborted as e:
msg = constants.ERR_CONN_ABORTED.format(socket_errno=e.socket_errno, socket_errmsg=e.socket_errmsg)
if self._ignore_connection_issues_when_sending_fuzz_data:
self._fuzz_data_logger.log_info(msg)
else:
raise BoofuzzFailure(msg)
except exception.BoofuzzSSLError as e:
if self._ignore_connection_ssl_errors:
self._fuzz_data_logger.log_info(str(e))
else:
raise BoofuzzFailure(str(e))
try: # recv
if node and (self._receive_data_after_each_request or node.answer_must_contain or
node.answer_must_not_contain) and node.receive_data_after_transmit:
connection = self.targets[self.target_to_use].get_connection()
if isinstance(connection, UDPSocketConnection) and not connection.bind:
connection.reuse_my_port()
self.last_recv = self.targets[self.target_to_use].recv()
if node.answer_must_not_contain or node.answer_must_contain:
node.analyze_answer(data=self.last_recv, session=self)
except exception.BoofuzzTargetConnectionReset:
if self._check_data_received_each_request:
raise BoofuzzFailure(message=constants.ERR_CONN_RESET)
else:
self._fuzz_data_logger.log_info(constants.ERR_CONN_RESET)
except exception.BoofuzzTargetConnectionAborted as e:
msg = constants.ERR_CONN_ABORTED.format(socket_errno=e.socket_errno, socket_errmsg=e.socket_errmsg)
if self._check_data_received_each_request:
raise BoofuzzFailure(msg)
else:
self._fuzz_data_logger.log_info(msg)
pass
except exception.BoofuzzSSLError as e:
if self._ignore_connection_ssl_errors:
self._fuzz_data_logger.log_info(str(e))
else:
self._fuzz_data_logger.log_fail(str(e))
raise BoofuzzFailure(str(e))
self.last_send = data
[docs]
def build_webapp_thread(self, port=constants.DEFAULT_WEB_UI_PORT,
address=constants.DEFAULT_WEB_UI_ADDRESS) -> threading.Thread:
"""
Create Webapp interface.
"""
app.session = self
http_server = HTTPServer(WSGIContainer(app))
while True:
try:
http_server.listen(port, address=address)
except socket.error as exc:
# Only handle "Address already in use"
if exc.errno != errno.EADDRINUSE:
raise
port += 1
else:
self._fuzz_data_logger.log_info("Web interface can be found at http://%s:%d" % (address, port))
break
flask_thread = threading.Thread(target=IOLoop.instance().start)
flask_thread.daemon = True
return flask_thread
[docs]
def feature_check(self):
"""Check all messages/features.
Returns:
None
"""
self.total_mutant_index = 0
self.total_num_mutations = self.num_mutations()
for path in self._iterate_protocol_message_paths():
self._message_check(path)
[docs]
def calculate_total_round(self):
"""Check how many random_mutation round is needed
Check the max(min(sizeof(Seclist), max_rounds_mutation)) and set _total_random_mutation_rounds
Returns:
None
"""
memo = self.round_type
self.round_type = "library"
for node in self.nodes:
if node != 0:
for item in self.nodes[node].stack:
if item.fuzzable:
num_mutation = item.get_num_mutations()
min_num_mutation_num_random_mutations = min(num_mutation, item.max_rounds_mutation)
if min_num_mutation_num_random_mutations > self._total_random_mutation_rounds:
self._total_random_mutation_rounds = min_num_mutation_num_random_mutations
self.round_type = memo
[docs]
def check_max_number_of_rounds(self) -> None:
""" Function use to increment and check if the total number of round go beyond the max number of round. """
self.total_num_round += 1
if self.max_number_of_rounds == 0:
return
if self.total_num_round >= self.max_number_of_rounds:
self._fuzz_data_logger.log_error(
f'Stop because total_num_round>=max_number_of_rounds ({self.max_number_of_rounds})')
exit(0)
[docs]
def fuzz_indefinitely(self, name=None):
"""
Parent function of fuzz(). Is only used to call fuzz() in a loop.
It fuzzes first with library mutations, then with random mutations, and then indefinitely with random
generations.
"""
self._keep_web_open = False
self.calculate_total_round() # Set _total_random_mutation_rounds
if self.round_type == "library":
self.fuzz(name=name)
self._index_start = 1
self.round_type = "random_mutation"
self.check_max_number_of_rounds()
if self.round_type == "random_mutation":
# Increment the seed index, from the starting seed to the total number of random mutation rounds
for self.seed_index in range(self.seed_index, self._total_random_mutation_rounds):
# Concatenate the mutation index with the mutation type to create a unique seed index
# Otherwise the seed index will be the same for each mutation type
self.seed = self.round_type + '.' + str(self.seed_index)
self.fuzz(name=name)
self.check_max_number_of_rounds()
# At the end of the random mutation rounds, set the mutation type to random generation
self.round_type = "random_generation"
self.seed_index = 0
if self.round_type == "random_generation":
# Is the "indefinitely" of "fuzz_indefinitely"
while True:
self.seed = self.round_type + '.' + str(self.seed_index)
self.fuzz(name=name)
self.seed_index += 1
self.check_max_number_of_rounds()
[docs]
def fuzz(self, name=None):
"""Fuzz the entire protocol tree.
Iterates through and fuzzes all fuzz cases, skipping according to
self.skip and restarting based on self.restart_interval.
If you want the web server to be available, your program must persist
after calling this method. helpers.pause_for_signal() is
available to this end.
Args:
name (str): Pass in a Request name to fuzz only a single request message. Pass in a test case name to fuzz
only a single test case.
Returns:
None
"""
if name is None or name == "":
self.total_num_mutations += self.num_mutations()
self._main_fuzz_loop(self._generate_mutations_indefinitely())
else:
path, mutations = helpers.parse_test_case_name(name)
if len(mutations) < 1:
self._fuzz_single_node_by_path(path)
else:
self.total_num_mutations = 1
node_edges = self._path_names_to_edges(node_names=path)
self._main_fuzz_loop(self._generate_test_case_from_named_mutations(node_edges, mutations))
[docs]
def fuzz_by_name(self, name):
"""Fuzz a particular test case or node by name.
Args:
name (str): Name of node.
.. deprecated :: 0.4.0
Use :meth:`Session.fuzz` instead.
"""
warnings.warn(
"Session.fuzz_by_name is deprecated in favor of Session.fuzz(name=name).",
FutureWarning,
)
self.fuzz(name=name)
[docs]
def _fuzz_single_node_by_path(self, node_names):
"""Fuzz a particular node via the path in node_names.
Args:
node_names (list of str): List of node names leading to target.
"""
node_edges = self._path_names_to_edges(node_names=node_names)
self.total_mutant_index = 0
self.total_num_mutations = self.nodes[node_edges[-1].dst].get_num_mutations()
self._main_fuzz_loop(self._generate_mutations_indefinitely(path=node_edges))
[docs]
def fuzz_single_case(self, mutant_index):
"""Deprecated: Fuzz a test case by mutant_index.
Deprecation note: The new approach is to set Session's start and end indices to the same value.
Args:
mutant_index (int): Positive non-zero integer.
Returns:
None
Raises:
sex.SulleyRuntimeError: If any error is encountered while executing the test case.
"""
warnings.warn(
"Session.fuzz_single_case is deprecated in favor of Session's index_start and index_end constructor "
"parameters."
)
self.total_mutant_index = 0
self.total_num_mutations = 1
self._main_fuzz_loop(self._generate_single_case_by_index(mutant_index))
[docs]
def _message_check(self, path):
"""Check messages for compatibility.
Preconditions: `self.total_mutant_index` and `self.total_num_mutations` are set properly.
Args:
path (list of Connection): Nodes (Requests) along the path to the target one.
Returns:
None
"""
self.server_init()
try:
self._check_message(MutationContext(message_path=path, mutations={}))
except KeyboardInterrupt:
# TODO: should wait for the end of the ongoing test case, and stop gracefully netmon and procmon
self.export_file()
self._fuzz_data_logger.log_error("SIGINT received ... exiting")
raise
except exception.BoofuzzRestartFailedError:
self._fuzz_data_logger.log_error("Restarting the target failed, exiting.")
self.export_file()
raise
except exception.BoofuzzTargetConnectionFailedError:
# exception should have already been handled but rethrown in order to escape test run
pass
except Exception:
self._fuzz_data_logger.log_error("Unexpected exception! {0}".format(traceback.format_exc()))
self.export_file()
raise
[docs]
def _main_fuzz_loop(self, fuzz_case_iterator):
"""Execute main fuzz logic; takes an iterator of test cases.
Preconditions: `self.total_mutant_index` and `self.total_num_mutations` are set properly.
Args:
fuzz_case_iterator (Iterable): An iterator that walks through fuzz cases and yields MutationContext objects.
See _iterate_single_node() for details.
Returns:
None
"""
self.server_init()
try:
self._start_target(self.targets[self.target_to_use])
if self._reuse_target_connection:
self.targets[self.target_to_use].open()
# self.num_cases_actually_fuzzed = 0
# self.start_time = time.time()
for mutation_context in fuzz_case_iterator:
if self.total_mutant_index < self._index_start:
continue
# Check restart interval
if (
self.num_cases_actually_fuzzed
and self.restart_interval
and self.num_cases_actually_fuzzed % self.restart_interval == 0
):
self._fuzz_data_logger.open_test_step(f"restart interval of {self.restart_interval} reached")
self._restart_target(self.targets[self.target_to_use])
self._fuzz_current_case(mutation_context)
self.num_cases_actually_fuzzed += 1
# Check nominal data test interval
if (
self.num_cases_actually_fuzzed
and self.nominal_test_interval
and self.num_cases_actually_fuzzed % self.nominal_test_interval == 0
):
self.nominal_test()
if self._index_end is not None and self.total_mutant_index >= self._index_end:
break
if self._reuse_target_connection:
self.targets[self.target_to_use].close()
if self._keep_web_open and self.web_port is not None:
self.end_time = time.time()
print(
"\nFuzzing session completed. Keeping webinterface up on {}:{}".format(
self.web_address, self.web_port
),
"\nPress ENTER to close webinterface",
)
input()
except KeyboardInterrupt:
# TODO: should wait for the end of the ongoing test case, and stop gracefully netmon and procmon
self.export_file()
# Assuming self._fuzz_data_logger_fuzz_loggers is the list of loggers
fuzz_logger_text_instance: fuzz_logger_text.FuzzLoggerText = next(
filter(lambda x: isinstance(x, fuzz_logger_text.FuzzLoggerText), self._fuzz_data_logger._fuzz_loggers),
None)
self._fuzz_data_logger.log_error("SIGINT received ... exiting")
logging_text = f"Total time: {time.time() - self.start_time} seconds\n" + self._fuzz_data_logger.failure_summary()
fuzz_logger_text_instance.log_recap(logging_text)
# Save the log recap to the campaign folder as a text file
if self.campaign_folder is not None:
with open(os.path.join(self.campaign_folder, constants.LOG_RECAP_NAME), "w", encoding="utf-8") as f:
# Write the recap
f.write(logging_text)
raise
except exception.BoofuzzRestartFailedError:
self._fuzz_data_logger.log_error("Restarting the target failed, exiting.")
self.export_file()
raise
except exception.BoofuzzTargetConnectionFailedError:
# exception should have already been handled but rethrown in order to escape test run
pass
except Exception:
self._fuzz_data_logger.log_error("Unexpected exception! {0}".format(traceback.format_exc()))
self.export_file()
raise
finally:
self._fuzz_data_logger.close_test()
[docs]
def _generate_single_case_by_index(self, test_case_index):
fuzz_index = 1
for m in self._generate_mutations_indefinitely():
if fuzz_index >= test_case_index:
self.total_mutant_index = 1
yield m
break
fuzz_index += 1
[docs]
def _generate_mutations_indefinitely(self, path=None):
"""Yield MutationContext with n mutations per message over all messages, with n increasing indefinitely."""
depth = 1
while self.max_depth is None or depth <= self.max_depth:
valid_case_found_at_this_depth = False
for m in self._generate_n_mutations(depth=depth, path=path):
valid_case_found_at_this_depth = True
yield m
if not valid_case_found_at_this_depth:
break
depth += 1
[docs]
def _generate_n_mutations(self, depth, path):
"""Yield MutationContext with n mutations per message over all messages."""
for path in self._iterate_protocol_message_paths(path=path):
for m in self._generate_n_mutations_for_path(path, depth=depth):
yield m
[docs]
def _generate_n_mutations_for_path(self, path, depth):
"""Yield MutationContext with n mutations for a specific message.
Args:
path (list of Connection): Nodes (Requests) along the path to the current one being fuzzed.
depth (int): Yield sets of depth mutations.
Yields:
MutationContext: A MutationContext containing one mutation.
"""
for mutations in self._generate_n_mutations_for_path_recursive(path, depth=depth):
if not self._mutations_contain_duplicate(mutations):
self.total_mutant_index += 1
yield MutationContext(message_path=path, mutations={n.qualified_name: n for n in mutations})
[docs]
def _generate_n_mutations_for_path_recursive(self, path, depth, skip_elements=None):
if skip_elements is None:
skip_elements = set()
if depth == 0:
yield []
return
new_skip = skip_elements.copy()
for mutations in self._generate_mutations_for_request(path=path, skip_elements=skip_elements):
new_skip.update(m.qualified_name for m in mutations)
for ms in self._generate_n_mutations_for_path_recursive(path, depth=depth - 1, skip_elements=new_skip):
yield mutations + ms
[docs]
def _iterate_protocol_message_paths(self, path=None):
"""
Iterates over protocol and yields a path (list of Connection) leading to a given message.
Args:
path (list of Connection): Provide a specific path to yield only that specific path.
Yields:
list of Connection: List of edges along the path to the current one being fuzzed.
Raises:
exception.SulleyRuntimeError: If no requests defined or no targets specified
"""
# we can't fuzz if we don't have at least one target and one request.
if not self.targets:
raise exception.SullyRuntimeError("No targets specified in session")
if not self.edges_from(self.root.id):
raise exception.SullyRuntimeError("No requests specified in session")
if path is not None:
yield path
else:
for x in self._iterate_protocol_message_paths_recursive(this_node=self.root, path=[]):
yield x
[docs]
def _iterate_protocol_message_paths_recursive(self, this_node, path):
"""Recursive helper for _iterate_protocol.
Args:
this_node (node.Node): Current node that is being fuzzed.
path (list of Connection): List of edges along the path to the current one being fuzzed.
Yields:
list of Connection: List of edges along the path to the current one being fuzzed.
"""
# step through every edge from the current node.
for edge in self.edges_from(this_node.id):
# keep track of the path as we fuzz through it, don't count the root node.
# we keep track of edges as opposed to nodes because if there is more than one path through a set of
# given nodes we don't want any ambiguity.
path.append(edge)
message_path = self._message_path_to_str(path)
logging.debug("fuzzing: {0}".format(message_path))
self.fuzz_node = self.nodes[path[-1].dst]
yield path
# recursively fuzz the remainder of the nodes in the session graph.
for x in self._iterate_protocol_message_paths_recursive(self.fuzz_node, path):
yield x
# finished with the last node on the path, pop it off the path stack.
if path:
path.pop()
[docs]
def _mutations_contain_duplicate(self, mutations):
names = [m.qualified_name for m in mutations]
for name1, name2 in itertools.combinations(names, r=2):
if name1 in name2 or name2 in name1:
return True
return False
[docs]
def _generate_mutations_for_request(self, path, skip_elements=None):
"""Yield each mutation for a specific message (the last message in path).
Args:
path (list of Connection): Nodes (Requests) along the path to the current one being fuzzed.
path (iter of str): Qualified names of elements to skip while fuzzing.
Yields:
Mutation: Mutation object describing a single mutation.
"""
if skip_elements is None:
skip_elements = []
self.fuzz_node = self.nodes[path[-1].dst]
self.mutant_index = 0
for mutations in self.fuzz_node.get_mutations(skip_elements=skip_elements):
self.mutant_index += 1
yield mutations
if self._skip_current_node_after_current_test_case:
self._skip_current_node_after_current_test_case = False
break
elif self._skip_current_element_after_current_test_case:
self.fuzz_node.mutant.stop_mutations()
self._skip_current_element_after_current_test_case = False
continue
[docs]
def _generate_test_case_from_named_mutations(self, path, mutation_names):
# need a way to get the mutation value based on the mutation index
self.fuzz_node = self.nodes[path[-1].dst]
self.mutant_index = 0
mutations = []
for mutation_name in mutation_names:
qualified_name, index = mutation_name.rsplit(":")
index = int(index)
fuzzable = self.fuzz_node.names[qualified_name]
mutations += next(itertools.islice(fuzzable.get_mutations(), index, index + 1))
self.total_mutant_index += 1
yield MutationContext(message_path=path, mutations={n.qualified_name: n for n in mutations})
[docs]
def _path_names_to_edges(self, node_names):
"""Take a list of node names and return a list of edges describing that path.
Args:
node_names (list of str): List of node names describing a path.
Returns:
list of Connection: List of edges describing the path in node_names.
"""
cur_node = self.root
edge_path = []
for node_name in node_names:
next_node = None
for edge in self.edges_from(cur_node.id):
if self.nodes[edge.dst].name == node_name:
edge_path.append(edge)
next_node = self.nodes[edge.dst]
break
if next_node is None:
raise Exception("No edge found from {0} to {1}".format(cur_node.name, node_name))
else:
cur_node = next_node
return edge_path
[docs]
def _check_message(self, mutation_context):
"""Sends the current message without fuzzing.
Current test case is controlled by fuzz_case_iterator().
Args:
mutation_context (MutationContext): Current mutation context.
"""
target = self.targets[self.target_to_use]
self.total_mutant_index += 1
self._pause_if_pause_flag_is_set()
test_case_name = self._test_case_name_feature_check(mutation_context)
self._fuzz_data_logger.open_test_case(
f'{self.total_mutant_index}: {test_case_name}',
name=test_case_name,
index=self.total_mutant_index,
num_mutations=self.total_num_mutations,
current_index=self.mutant_index,
current_num_mutations=self.fuzz_node.get_num_mutations(),
round_type=self.round_type,
seed=self.seed,
seed_index=self.seed_index
)
try:
self._open_connection_keep_trying(target)
self._pre_send(target)
for e in mutation_context.message_path[:-1]:
prev_node = self.nodes[e.src]
node = self.nodes[e.dst]
protocol_session = ProtocolSession(
previous_message=prev_node,
current_message=node,
)
mutation_context.protocol_session = protocol_session
self._fuzz_data_logger.open_test_step("Prep Node '{0}'".format(node.name))
callback_data = self._callback_current_node(node=node, edge=e, test_case_context=protocol_session)
self.transmit_normal(target, node, e, callback_data=callback_data, mutation_context=mutation_context)
prev_node = self.nodes[mutation_context.message_path[-1].src]
node = self.nodes[mutation_context.message_path[-1].dst]
protocol_session = ProtocolSession(
previous_message=prev_node,
current_message=node,
)
mutation_context.protocol_session = protocol_session
callback_data = self._callback_current_node(
node=self.fuzz_node, edge=mutation_context.message_path[-1], test_case_context=protocol_session
)
self._fuzz_data_logger.open_test_step("Node Under Test '{0}'".format(self.fuzz_node.name))
self.transmit_normal(
target,
self.fuzz_node,
mutation_context.message_path[-1],
callback_data=callback_data,
mutation_context=mutation_context,
)
self._check_for_passively_detected_failures(target)
if not self._reuse_target_connection:
target.close()
if self.sleep_time > 0:
self._fuzz_data_logger.open_test_step("Sleep between tests.")
self._fuzz_data_logger.log_info("sleeping for %f seconds" % self.sleep_time)
time.sleep(self.sleep_time)
finally:
if self._process_failures(target=target):
print("FAIL: {0}".format(test_case_name))
else:
print("PASS: {0}".format(test_case_name))
self._get_monitor_data(target)
self._fuzz_data_logger.close_test_case()
self.export_file()
[docs]
def _fuzz_current_case(self, mutation_context: MutationContext):
"""
Fuzzes the current test case. Current test case is controlled by
fuzz_case_iterator().
Args:
mutation_context (MutationContext): Current mutation context.
"""
self.continue_case = True
target: Target = self.targets[self.target_to_use]
self._pause_if_pause_flag_is_set()
test_case_name = self._test_case_name(mutation_context)
self.current_test_case_name = test_case_name
self._fuzz_data_logger.open_test_case(
f'{self.total_mutant_index}: {test_case_name}',
name=test_case_name,
index=self.total_mutant_index,
num_mutations=self.total_num_mutations,
current_index=self.mutant_index,
current_num_mutations=self.fuzz_node.get_num_mutations(),
round_type=self.round_type,
seed=self.seed,
seed_index=self.seed_index
)
if self.total_num_mutations is not None:
self._fuzz_data_logger.log_info(
"Type: {0}. Case {1} of {2} overall.".format(
type(self.fuzz_node.mutant).__name__,
self.total_mutant_index,
self.total_num_mutations,
)
)
else:
self._fuzz_data_logger.log_info(
"Type: {0}".format(
type(self.fuzz_node.mutant).__name__,
)
)
try:
self._open_connection_keep_trying(target)
self._pre_send(target)
for e in mutation_context.message_path[:-1]:
if self.continue_case:
prev_node = self.nodes[e.src]
node: Request = self.nodes[e.dst]
protocol_session = ProtocolSession(
previous_message=prev_node,
current_message=node,
)
mutation_context.protocol_session = protocol_session
callback_data = self._callback_current_node(node=node, edge=e, test_case_context=protocol_session)
if self.continue_case:
self._fuzz_data_logger.open_test_step("Transmit Prep Node '{0}'".format(node.name))
self.fragmentation_check(target, node, e, callback_data=callback_data,
mutation_context=mutation_context, transmit_type="normal")
prev_node = self.nodes[mutation_context.message_path[-1].src]
node = self.nodes[mutation_context.message_path[-1].dst]
protocol_session = ProtocolSession(
previous_message=prev_node,
current_message=node,
)
mutation_context.protocol_session = protocol_session
callback_data = self._callback_current_node(
node=self.fuzz_node, edge=mutation_context.message_path[-1], test_case_context=protocol_session
)
if self.continue_case:
self._fuzz_data_logger.open_test_step(f"Fuzzing Node '{self.fuzz_node.name}'")
self._fuzz_data_logger.open_test_step(f"Fuzzing Primitive '{self.fuzz_node.mutant.qualified_name}'")
self.fragmentation_check(
target,
self.fuzz_node,
mutation_context.message_path[-1],
callback_data=callback_data,
mutation_context=mutation_context,
transmit_type="fuzz"
)
self._check_for_passively_detected_failures(target=target)
if not self._reuse_target_connection:
target.close()
if self.sleep_time > 0:
self._fuzz_data_logger.open_test_step("Sleep between tests.")
self._sleep(self.sleep_time)
except BoofuzzFailure as e:
self._fuzz_data_logger.log_fail(e.message)
self._check_for_passively_detected_failures(target=target, failure_already_detected=True)
finally:
self._process_failures(target=target)
self._fuzz_data_logger.close_test_case()
self.export_file()
[docs]
def _open_connection_keep_trying(self, target: Target):
"""Open connection and if it fails, keep retrying.
Args:
target (Target): Target to open.
"""
if not self._reuse_target_connection:
out_of_available_sockets_count = 0
unable_to_connect_count = 0
initial_time = time.time()
while True:
try:
target.open()
break # break if no exception
except exception.BoofuzzTargetConnectionFailedError:
if self.restart_threshold and unable_to_connect_count >= self.restart_threshold:
self._fuzz_data_logger.log_info(
"Unable to reconnect to target: Reached threshold of {0} retries. Ending fuzzing.".format(
self.restart_threshold
)
)
raise
elif self.restart_timeout and time.time() >= initial_time + self.restart_timeout:
self._fuzz_data_logger.log_info(
"Unable to reconnect to target: Reached restart timeout of {0}s. Ending fuzzing.".format(
self.restart_timeout
)
)
raise
else:
self._fuzz_data_logger.log_info(constants.WARN_CONN_FAILED_TERMINAL)
self._restart_target(target)
unable_to_connect_count += 1
except exception.BoofuzzOutOfAvailableSockets:
out_of_available_sockets_count += 1
if out_of_available_sockets_count == 50:
raise exception.BoofuzzError("There are no available sockets. Ending fuzzing.")
self._fuzz_data_logger.log_info("There are no available sockets. Waiting for another 5 seconds.")
time.sleep(5)
[docs]
def _sleep(self, seconds):
self._fuzz_data_logger.log_info("sleeping for %f seconds" % seconds)
time.sleep(seconds)
[docs]
def _test_case_name_feature_check(self, mutation_context):
message_path = self._message_path_to_str(mutation_context.message_path)
return "FEATURE-CHECK->{0}".format(message_path)
[docs]
def _test_case_name(self, mutation_context):
"""Get long test case name.
Args:
mutation_context (MutationContext): MutationContext to get name from.
Returns:
Long formatted test case name
"""
message_path = self._message_path_to_str(mutation_context.message_path)
mutation_names = (
"{0}:{1}".format(qualified_name, mutation.index)
for qualified_name, mutation in mutation_context.mutations.items()
)
ret = "{0}:[{1}]".format(message_path, ", ".join(mutation_names))
return ret + f' round_type={self.round_type} seed_index={self.seed_index} seed="{self.seed}"'
[docs]
def _message_path_to_str(self, message_path):
"""
Converts a message path, iterable, to a string.
Uses the dst key to get the name of the node.
-> is used to separate the nodes.
"""
return "->".join([self.nodes[e.dst].name for e in message_path])
[docs]
def test_case_data(self, index):
"""Return test case data object (for use by web server)
Args:
index (int): Test case index
Returns:
DataTestCase: Test case data object
"""
return self._db_logger.get_test_case_data(index=index)
[docs]
def get_fuzz_data_logger(self):
""" Getter for the data logger.
The data logger can be used to add log to the database.
You can choose the log type (error, info, ...)
"""
return self._fuzz_data_logger