import collections
import datetime
import sqlite3
import sys
import deprecated
from boofuzz import data_test_case, data_test_step, exception, helpers
from boofuzz.loggers.ifuzz_logger_backend import IFuzzLoggerBackend
# fixup for buffer in python 3
if sys.version_info.major > 2:
buffer = memoryview
def hex_to_hexstr(input_bytes):
"""
Render input_bytes as ASCII-encoded hex bytes, followed by a best effort
utf-8 rendering.
:param input_bytes: Arbitrary bytes.
:return: Printable string.
"""
return helpers.hex_str(input_bytes)
DEFAULT_HEX_TO_STR = hex_to_hexstr
def get_time_stamp():
s = datetime.datetime.utcnow().isoformat()
return s
[docs]
@deprecated.deprecated(version='1.0', reason='Use FuzzLoggerPostgres instead')
class FuzzLoggerDb(IFuzzLoggerBackend):
"""
Deprecated if favor of `FuzzLoggerPostgres`.
This is the old way to log in a db : sqlite database file.
Old docstring :
Log fuzz data in a sqlite database file.
Using an existing database requires more graceful exits to prevent case number duplication.
"""
def __init__(self, db_filename, num_log_cases=0):
# Check if a db exists before the connect leaves an "empty" file behind.
db_already_exist = helpers.path_exists(db_filename)
self._database_connection = sqlite3.connect(db_filename, check_same_thread=False)
self._db_cursor = self._database_connection.cursor()
if not db_already_exist:
self._db_cursor.execute("""CREATE TABLE cases (name text, number integer, round_type text,
seed text, seed_index integer, timestamp TEXT)""")
self._db_cursor.execute(
"""CREATE TABLE steps (test_case_index integer, type text, description text, data blob,
timestamp TEXT, is_truncated BOOLEAN)"""
)
self._current_test_case_index = 0
self._queue = collections.deque([]) # Queue that holds last n test cases before commiting
self._queue_max_len = num_log_cases
self._fail_detected = False
self._log_first_case = True
self._data_truncate_length = 512
[docs]
def get_test_case_data(self, index):
c = self._database_connection.cursor()
try:
test_case_row = next(c.execute("""SELECT * FROM cases WHERE number=?""", [index]))
except StopIteration:
return None
rows = c.execute("""SELECT * FROM steps WHERE test_case_index=?""", [index])
steps = []
for row in rows:
data = row[3]
# Little hack since BLOB becomes type buffer in py2 and bytes in py3
# At the end, data will be equivalent types: bytes in py3 and str in py2
try:
if isinstance(data, buffer):
data = str(data)
except NameError as e:
if "buffer" in str(e): # buffer type does not exist in py3
pass
else:
raise
steps.append(
data_test_step.DataTestStep(
type=row[1], description=row[2], data=data, timestamp=row[4], truncated=row[5]
)
)
return data_test_case.DataTestCase(
name=test_case_row[0], index=test_case_row[1], timestamp=test_case_row[2], steps=steps
)
[docs]
def open_test_case(self, test_case_id, name, index, round_type=None, seed=None, seed_index=None, *args,
**kwargs):
self._queue.append(
["INSERT INTO cases VALUES(?, ?, ?, ?, ?, ?);\n", name, index, round_type, seed, seed_index,
helpers.get_time_stamp()])
self._current_test_case_index = index
[docs]
def open_test_step(self, description):
self._queue.append(
[
"INSERT INTO steps VALUES(?, ?, ?, ?, ?, ?);\n",
self._current_test_case_index,
"step",
description,
b"",
helpers.get_time_stamp(),
False,
]
)
[docs]
def log_check(self, description):
self._queue.append(
[
"INSERT INTO steps VALUES(?, ?, ?, ?, ?, ?);\n",
self._current_test_case_index,
"check",
description,
b"",
helpers.get_time_stamp(),
False,
]
)
[docs]
def log_error(self, description):
self._queue.append(
[
"INSERT INTO steps VALUES(?, ?, ?, ?, ?, ?);\n",
self._current_test_case_index,
"error",
description,
b"",
helpers.get_time_stamp(),
False,
]
)
self._fail_detected = True
self._write_log()
[docs]
def log_recv(self, data):
self._queue.append(
[
"INSERT INTO steps VALUES(?, ?, ?, ?, ?, ?);\n",
self._current_test_case_index,
"receive",
"",
buffer(data),
helpers.get_time_stamp(),
False,
]
)
[docs]
def log_send(self, data):
self._queue.append(
[
"INSERT INTO steps VALUES(?, ?, ?, ?, ?, ?);\n",
self._current_test_case_index,
"send",
"",
buffer(data),
helpers.get_time_stamp(),
False,
]
)
[docs]
def log_info(self, description):
self._queue.append(
[
"INSERT INTO steps VALUES(?, ?, ?, ?, ?, ?);\n",
self._current_test_case_index,
"info",
description,
b"",
helpers.get_time_stamp(),
False,
]
)
[docs]
def log_fail(self, description=""):
self._queue.append(
[
"INSERT INTO steps VALUES(?, ?, ?, ?, ?, ?);\n",
self._current_test_case_index,
"fail",
description,
b"",
helpers.get_time_stamp(),
False,
]
)
self._fail_detected = True
[docs]
def log_target_warn(self, description=""):
self._queue.append(
[
"INSERT INTO steps VALUES(?, ?, ?, ?, ?, ?);\n",
self._current_test_case_index,
"target-warn",
description,
b"",
helpers.get_time_stamp(),
False,
]
)
[docs]
def log_target_error(self, description=""):
self._queue.append(
[
"INSERT INTO steps VALUES(?, ?, ?, ?, ?, ?);\n",
self._current_test_case_index,
"target-error",
description,
b"",
helpers.get_time_stamp(),
False,
]
)
[docs]
def log_pass(self, description=""):
self._queue.append(
[
"INSERT INTO steps VALUES(?, ?, ?, ?, ?, ?);\n",
self._current_test_case_index,
"pass",
description,
b"",
helpers.get_time_stamp(),
False,
]
)
[docs]
def close_test_case(self):
self._write_log(force=False)
[docs]
def close_test(self):
self._write_log(force=True)
def _write_log(self, force=False):
if len(self._queue) > 0:
if self._queue_max_len > 0:
while (
self._current_test_case_index - next(x for x in self._queue[0] if isinstance(x, int))
) >= self._queue_max_len:
self._queue.popleft()
else:
force = True
if force or self._fail_detected or self._log_first_case:
for query in self._queue:
# abbreviate long entries first
if not self._fail_detected:
self._truncate_send_recv(query)
self._db_cursor.execute(query[0], query[1:])
self._queue.clear()
self._database_connection.commit()
self._log_first_case = False
self._fail_detected = False
def _truncate_send_recv(self, query):
if query[2] in ["send", "recv"] and len(query[4]) > self._data_truncate_length:
query[6] = True
query[4] = buffer(query[4][: self._data_truncate_length])
class FuzzLoggerDbReader:
"""Read fuzz data saved using FuzzLoggerDb
Args:
db_filename (str): Name of database file to read.
"""
def __init__(self, db_filename):
self._database_connection = sqlite3.connect(db_filename, check_same_thread=False)
self._db_cursor = self._database_connection.cursor()
def get_test_case_data(self, index):
c = self._db_cursor
try:
test_case_row = next(c.execute("""SELECT * FROM cases WHERE number=?""", [index]))
except StopIteration:
raise exception.BoofuzzNoSuchTestCase()
rows = c.execute("""SELECT * FROM steps WHERE test_case_index=?""", [index])
steps = []
for row in rows:
data = row[3]
# Little hack since BLOB becomes type buffer in py2 and bytes in py3
# At the end, data will be equivalent types: bytes in py3 and str in py2
try:
if isinstance(data, buffer):
data = str(data)
except NameError as e:
if "buffer" in str(e): # buffer type does not exist in py3
pass
else:
raise
steps.append(
data_test_step.DataTestStep(
type=row[1], description=row[2], data=data, timestamp=row[4], truncated=row[5]
)
)
return data_test_case.DataTestCase(
name=test_case_row[0], index=test_case_row[1], timestamp=test_case_row[2], steps=steps
)
def query(self, query, params=None):
if params is None:
params = []
c = self._db_cursor
return c.execute(query, params)
@property
def failure_map(self):
c = self._db_cursor
failure_steps = c.execute('''SELECT * FROM steps WHERE type="fail"''')
failure_map = collections.defaultdict(list)
for step in failure_steps:
failure_map[step[0]].append(step[2])
return failure_map