import collections
import datetime
import psycopg
import psycopg.sql
import os
import subprocess
from typing import Generator
from colorama import Fore, Style
import boofuzz.constants
from boofuzz import data_test_case, data_test_step
from boofuzz.loggers.ifuzz_logger_backend import IFuzzLoggerBackend
type Path = str
def get_time_stamp():
return datetime.datetime.now(datetime.UTC).isoformat()
def get_db_socket_path() -> Path:
"""Return the absolute path of the unix socket"""
inside_docker = os.getenv('INSIDE_DOCKER', False)
if inside_docker:
abs_db_socket_path = '/var/run/postgresql/'
else:
# Get this file path
file_path = os.path.abspath(__file__)
# Strip the file name and the last two directory from the path
dir_path = os.path.dirname(os.path.dirname(os.path.dirname(file_path)))
# Add db_socket to the path
abs_db_socket_path = os.path.join(dir_path, "db_socket")
return abs_db_socket_path
def _get_test_case_data(database_connection: psycopg.Connection, table_cases_name: str, table_steps_name: str,
index: int) -> data_test_case.DataTestCase | None:
c = database_connection.cursor()
c.execute(
psycopg.sql.SQL(
"""SELECT name, number, timestamp FROM {} WHERE number=%s LIMIT 1"""
).format(psycopg.sql.Identifier(table_cases_name)),
(
index,
)
)
test_case_row = c.fetchone()
if test_case_row is None:
return None
c.execute(
psycopg.sql.SQL(
"""SELECT type, description, data, timestamp, is_truncated FROM {}
WHERE test_case_index=%s"""
).format(psycopg.sql.Identifier(table_steps_name)),
(
index,
)
)
rows = c.fetchall()
steps = []
for row in rows:
steps.append(
data_test_step.DataTestStep(
type=row[0], description=row[1], data=row[2], timestamp=row[3].isoformat(), truncated=row[4]
)
)
return data_test_case.DataTestCase(
name=test_case_row[0], index=test_case_row[1], timestamp=test_case_row[2].isoformat(), steps=steps
)
def verify_name_len(db_name: str, db_table_name: str | None):
"""Verify that len of identifiers are good for postgres."""
if len(db_name) > boofuzz.constants.DB_MAX_IDENTIFIERS_LEN:
raise Exception(f'len of db_name boofuzz.loggers.fuzz_logger_postgres.'
f'{len(db_name)=}')
if db_table_name is not None and len(db_table_name) > boofuzz.constants.DB_MAX_IDENTIFIERS_LEN - 6:
# minus 6 because max(len('_cases'), len('_steps')) = 6
raise Exception(f'len of db_table_name too long in boofuzz.loggers.fuzz_logger_postgres.'
f'{len(db_table_name)=}')
[docs]
class FuzzLoggerPostgres(IFuzzLoggerBackend):
"""
Log fuzz data in a PostgreSQL database.
Using an existing database requires more graceful exits to prevent case number duplication.
Args:
db_name (str): Name of the database.
db_table_name (str | None): Name of tables in database. Default "".
num_log_cases (int): Minimize disk usage by only saving passing test cases
if they are in the n test cases preceding a failure or error.
Set to 0 to save after every test case (high disk I/O!). Default 0.
"""
def __init__(self, db_name: str, db_table_name: str | None = None, num_log_cases=0):
verify_name_len(db_name, db_table_name)
abs_db_socket_path = get_db_socket_path()
db_connection = psycopg.connect(
host=abs_db_socket_path,
dbname=boofuzz.constants.DB_DEFAULT_NAME,
user=boofuzz.constants.DB_USER_NAME,
password=boofuzz.constants.DB_PASSWORD
)
db_connection.autocommit = True
db_cursor = db_connection.cursor()
try:
db_cursor.execute(
psycopg.sql.SQL(
"""CREATE DATABASE {}"""
).format(psycopg.sql.Identifier(db_name))
)
except psycopg.errors.DuplicateDatabase:
pass
finally:
db_cursor.close()
del db_cursor
db_connection.close()
del db_connection
self._db_connection = psycopg.connect(
host=abs_db_socket_path,
dbname=db_name,
user=boofuzz.constants.DB_USER_NAME,
password=boofuzz.constants.DB_PASSWORD
)
self._db_cursor = self._db_connection.cursor()
self._db_cursor.execute("SET timezone = 'UTC'")
self._table_cases_name = 'cases' if db_table_name is None else db_table_name + '_cases'
self._db_cursor.execute(
psycopg.sql.SQL(
"""
CREATE TABLE IF NOT EXISTS {} (
id SERIAL NOT NULL PRIMARY KEY,
name TEXT NOT NULL,
number INTEGER NOT NULL,
round_type TEXT,
seed TEXT,
seed_index INTEGER,
timestamp TIMESTAMP NOT NULL)
"""
).format(psycopg.sql.Identifier(self._table_cases_name))
)
self._table_steps_name = 'steps' if db_table_name is None else db_table_name + '_steps'
self._db_cursor.execute(
psycopg.sql.SQL(
"""
CREATE TABLE IF NOT EXISTS {} (
id SERIAL NOT NULL PRIMARY KEY,
test_case_index INTEGER NOT NULL,
type TEXT NOT NULL,
description TEXT NOT NULL,
data BYTEA NOT NULL,
is_truncated BOOLEAN NOT NULL,
timestamp TIMESTAMP NOT NULL)
"""
).format(psycopg.sql.Identifier(self._table_steps_name))
)
self._db_connection.commit()
self._current_test_case_index = 0
self._queue = collections.deque([]) # Queue that holds last n test cases before commiting
self._queue_max_len = num_log_cases
self._problem_detected = False
self._log_first_case = True
self._data_truncate_length = 512
[docs]
def get_test_case_data(self, index: int) -> data_test_case.DataTestCase:
return _get_test_case_data(self._db_connection, self._table_cases_name, self._table_steps_name, index)
[docs]
def open_test_case(self, test_case_id, name, index, round_type=None, seed=None, seed_index=None, *args, **kwargs):
self._queue.append(
[
psycopg.sql.SQL(
"""INSERT INTO {} (name, number, round_type, seed, seed_index, timestamp)
VALUES(%s, %s, %s, %s, %s, %s)"""
).format(psycopg.sql.Identifier(self._table_cases_name)),
(
name,
index,
round_type,
seed,
seed_index,
get_time_stamp()
)
]
)
self._current_test_case_index = index
[docs]
def open_test_step(self, description):
self._queue.append(
[
psycopg.sql.SQL(
"""INSERT INTO {} (test_case_index, type, description, data, is_truncated, timestamp)
VALUES(%s, %s, %s, %s, %s, %s)"""
).format(psycopg.sql.Identifier(self._table_steps_name)),
(
self._current_test_case_index,
"step",
description,
b"",
False,
get_time_stamp()
)
]
)
[docs]
def log_check(self, description):
self._queue.append(
[
psycopg.sql.SQL(
"""INSERT INTO {} (test_case_index, type, description, data, is_truncated, timestamp)
VALUES(%s, %s, %s, %s, %s, %s)"""
).format(psycopg.sql.Identifier(self._table_steps_name)),
(
self._current_test_case_index,
"check",
description,
b"",
False,
get_time_stamp()
)
]
)
[docs]
def log_error(self, description):
self._queue.append(
[
psycopg.sql.SQL(
"""INSERT INTO {} (test_case_index, type, description, data, is_truncated, timestamp)
VALUES(%s, %s, %s, %s, %s, %s)"""
).format(psycopg.sql.Identifier(self._table_steps_name)),
(
self._current_test_case_index,
"error",
description,
b"",
False,
get_time_stamp()
)
]
)
self._problem_detected = True
self._write_log()
[docs]
def log_recv(self, data):
self._queue.append(
[
psycopg.sql.SQL(
"""INSERT INTO {} (test_case_index, type, description, data, is_truncated, timestamp)
VALUES(%s, %s, %s, %s, %s, %s)"""
).format(psycopg.sql.Identifier(self._table_steps_name)),
[
self._current_test_case_index,
"receive",
"",
memoryview(data),
False,
get_time_stamp()
] # List and not tuple because it might trunc the data
]
)
[docs]
def log_send(self, data):
self._queue.append(
[
psycopg.sql.SQL(
"""INSERT INTO {} (test_case_index, type, description, data, is_truncated, timestamp)
VALUES(%s, %s, %s, %s, %s, %s)"""
).format(psycopg.sql.Identifier(self._table_steps_name)),
[
self._current_test_case_index,
"send",
"",
memoryview(data),
False,
get_time_stamp()
] # List and not tuple because it might trunc the data
]
)
[docs]
def log_info(self, description):
self._queue.append(
[
psycopg.sql.SQL(
"""INSERT INTO {} (test_case_index, type, description, data, is_truncated, timestamp)
VALUES(%s, %s, %s, %s, %s, %s)"""
).format(psycopg.sql.Identifier(self._table_steps_name)),
(
self._current_test_case_index,
"info",
description,
b"",
False,
get_time_stamp()
)
]
)
[docs]
def log_fail(self, description=""):
self._queue.append(
[
psycopg.sql.SQL(
"""INSERT INTO {} (test_case_index, type, description, data, is_truncated, timestamp)
VALUES(%s, %s, %s, %s, %s, %s)"""
).format(psycopg.sql.Identifier(self._table_steps_name)),
(
self._current_test_case_index,
"fail",
description,
b"",
False,
get_time_stamp()
)
]
)
self._problem_detected = True
[docs]
def log_target_warn(self, description=""):
self._queue.append(
[
psycopg.sql.SQL(
"""INSERT INTO {} (test_case_index, type, description, data, is_truncated, timestamp)
VALUES(%s, %s, %s, %s, %s, %s)"""
).format(psycopg.sql.Identifier(self._table_steps_name)),
(
self._current_test_case_index,
"target-warn",
description,
b"",
False,
get_time_stamp()
)
]
)
self._problem_detected = True
[docs]
def log_target_error(self, description=""):
self._queue.append(
[
psycopg.sql.SQL(
"""INSERT INTO {} (test_case_index, type, description, data, is_truncated, timestamp)
VALUES(%s, %s, %s, %s, %s, %s)"""
).format(psycopg.sql.Identifier(self._table_steps_name)),
(
self._current_test_case_index,
"target-error",
description,
b"",
False,
get_time_stamp()
)
]
)
self._problem_detected = True
[docs]
def log_pass(self, description=""):
self._queue.append(
[
psycopg.sql.SQL(
"""INSERT INTO {} (test_case_index, type, description, data, is_truncated, timestamp)
VALUES(%s, %s, %s, %s, %s, %s)"""
).format(psycopg.sql.Identifier(self._table_steps_name)),
(
self._current_test_case_index,
"pass",
description,
b"",
False,
get_time_stamp()
)
]
)
[docs]
def close_test_case(self):
self._write_log(force=False)
[docs]
def close_test(self):
self._write_log(force=True)
def _write_log(self, force=False):
if len(self._queue) > 0:
if self._queue_max_len > 0:
while (
self._current_test_case_index - next(x for x in self._queue[0] if isinstance(x, int))
) >= self._queue_max_len:
self._queue.popleft()
else:
force = True
if force or self._problem_detected or self._log_first_case:
for query in self._queue:
# abbreviate long entries first
if not self._problem_detected:
self._truncate_send_recv(query)
self._db_cursor.execute(query[0], query[1])
self._queue.clear()
self._db_connection.commit()
self._log_first_case = False
self._problem_detected = False
def _truncate_send_recv(self, query):
if query[1][1] in ["send", "recv"] and len(query[1][3]) > self._data_truncate_length:
query[1][4] = True
query[1][3] = memoryview(query[1][3][: self._data_truncate_length])
class FuzzLoggerPostgresReader:
"""Read fuzz data saved using FuzzLoggerPostgres
Args:
db_table_name (str): Name of table in database.
"""
def __init__(self, db_name: str, db_table_name: str | None = None):
verify_name_len(db_name, db_table_name)
abs_db_socket_path = get_db_socket_path()
self._db_connection = psycopg.connect(
host=abs_db_socket_path,
dbname=db_name,
user=boofuzz.constants.DB_USER_NAME,
password=boofuzz.constants.DB_PASSWORD
)
self._db_cursor = self._db_connection.cursor()
self._db_cursor.execute("SET timezone = 'UTC'")
self._db_connection.commit()
self._table_cases_name = 'cases' if db_table_name is None else db_table_name + '_cases'
self._table_steps_name = 'steps' if db_table_name is None else db_table_name + '_steps'
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self._db_cursor.close()
self._db_connection.close()
def get_test_case_data(self, index: int) -> data_test_case.DataTestCase:
return _get_test_case_data(self._db_connection, self._table_cases_name, self._table_steps_name, index)
def get_data_for_continue_command(self) -> (str, int, int):
self._db_cursor.execute(
psycopg.sql.SQL(
"""SELECT round_type, seed_index FROM {} ORDER BY number DESC LIMIT 1"""
).format(psycopg.sql.Identifier(self._table_cases_name))
)
round_type, seed_index = self._db_cursor.fetchone()
self._db_cursor.execute(
psycopg.sql.SQL(
"""SELECT number FROM {} WHERE round_type = %s AND seed_index = %s ORDER BY number ASC LIMIT 1"""
).format(psycopg.sql.Identifier(self._table_cases_name)),
(
round_type,
seed_index
)
)
mutant_index = self._db_cursor.fetchone()[0] # First mutant_index with this round_type and seed_index
return round_type, seed_index, mutant_index
def get_total_mutant_index(self) -> int:
self._db_cursor.execute(
psycopg.sql.SQL(
"""SELECT number FROM {} ORDER BY number DESC LIMIT 1"""
).format(psycopg.sql.Identifier(self._table_cases_name))
)
total_mutant_index = self._db_cursor.fetchone()[0]
return total_mutant_index
def get_datname_and_db_size(self) -> Generator[tuple[str, str], None, None]:
"""Return a generator of tuple which contains every database name : (db_name, db_size)"""
self._db_cursor.execute(
"""
SELECT datname,
pg_size_pretty(pg_database_size(datname)) as db_size
FROM pg_database
WHERE datistemplate = false
AND datname NOT IN ('fuzz', 'postgres')
ORDER BY datname;
"""
)
while (res := self._db_cursor.fetchone()) is not None:
yield res
@staticmethod
def open_cli_connection(db_name: str = None):
"""This function open a Postgres shell with psql"""
if db_name is None:
db_name = boofuzz.constants.DB_DEFAULT_NAME
verify_name_len(db_name, None)
abs_db_socket_path = get_db_socket_path()
print(f'The psql shell will open, use {Fore.BLUE}ctrl+d{Style.RESET_ALL} or '
f'{Fore.BLUE}\\q{Style.RESET_ALL} to quit.\n')
subprocess.run(['psql',
f'--host={abs_db_socket_path}',
f'--username={boofuzz.constants.DB_USER_NAME}',
f'--dbname={db_name}'])
# def query(self, query, params=None):
# if params is None:
# params = []
# c = self._db_cursor
# return c.execute(query, params)
#
@property
def failure_map(self):
self._db_cursor.execute(
psycopg.sql.SQL(
'''SELECT test_case_index, description FROM {} WHERE type=%s '''
).format(psycopg.sql.Identifier(self._table_steps_name)),
(
'fail',
)
)
failure_steps = self._db_cursor.fetchall()
failure_map = collections.defaultdict(list)
for step in failure_steps:
failure_map[step[0]].append(step[1])
return failure_map