Skip to content

PgSummary

All of pg_summary's functionality can be accessed by the PgSummary object, which includes all the methods and attributes mentioned in the sections below.

pg_summary.PgSummary

PgSummary(host: str, database: str, user: str, table_or_view: str, passwd: None | str = None, port: int = 5432, schema: str = 'public', outfile: Path | None = None, include_src: bool = False)

Create a summary of unique values for each column in a Postgres table or view and summarize results in an Excel workbook.

Parameters:

Name Type Description Default
host str

Name of the Postgres host.

required
database str

Name of the Postgres database.

required
user str

Name of the Postgres user.

required
table_or_view str

Name of the Postgres table or view to summarize.

required
passwd str

Password for the Postgres user. If not defined, the user will be prompted for the password. Defaults to None.

None
port int

Port number of the Postgres host. Defaults to 5432.

5432
schema str

Name of the Postgres schema containing the table or view to summarize. Defaults to "public".

'public'
outfile Path

Path to the output Excel file containing summarized results. If not defined, the output file will be named PgSummary_{table_or_view}.xlsx. Defaults to None.

None
include_src bool

Include the source table or view in the output Excel file as a separate sheet. Defaults to False.

False

Raises:

Type Description
ValueError

If the schema does not exist.

ValueError

If the table or view does not exist.

ValueError

If the table or view has no rows.

Examples:

>>> from pg_summary import PgSummary
...
... PgSummary(
...     host="localhost",
...     database="mydb",
...     user="username",
...     table_or_view="mytable",
...     schema="public",
...     include_src=True,
... ).summary()
Source code in pg_summary/pg_summary.py
def __init__(
    self: PgSummary,
    host: str,
    database: str,
    user: str,
    table_or_view: str,
    passwd: None | str = None,
    port: int = 5432,
    schema: str = "public",
    outfile: Path | None = None,
    include_src: bool = False,
) -> None:
    """Create a summary of unique values for each column in a Postgres table or view and summarize results in an Excel workbook.

    Args:
        host (str): Name of the Postgres host.
        database (str): Name of the Postgres database.
        user (str): Name of the Postgres user.
        table_or_view (str): Name of the Postgres table or view to summarize.
        passwd (str, optional): Password for the Postgres user. If not defined, the user will be prompted for the password. Defaults to None.
        port (int, optional): Port number of the Postgres host. Defaults to 5432.
        schema (str, optional): Name of the Postgres schema containing the table or view to summarize. Defaults to "public".
        outfile (Path, optional): Path to the output Excel file containing summarized results. If not defined, the output file will be named `PgSummary_{table_or_view}.xlsx`. Defaults to None.
        include_src (bool, optional): Include the source table or view in the output Excel file as a separate sheet. Defaults to False.

    Raises:
        ValueError: If the schema does not exist.
        ValueError: If the table or view does not exist.
        ValueError: If the table or view has no rows.

    Examples:
        >>> from pg_summary import PgSummary
        ...
        ... PgSummary(
        ...     host="localhost",
        ...     database="mydb",
        ...     user="username",
        ...     table_or_view="mytable",
        ...     schema="public",
        ...     include_src=True,
        ... ).summary()
    """  # noqa: E501
    self.db = PostgresDB(host, database, user, port, passwd)
    self.schema = schema
    self.table_or_view = table_or_view
    self.outfile = outfile
    self.include_src = include_src
    logger.debug(f"{self!s}")
    self._validate_args()

get_column_names

get_column_names() -> list

Return a list of column names in a table or view.

Source code in pg_summary/pg_summary.py
def get_column_names(self: PgSummary) -> list:
    """Return a list of column names in a table or view."""
    sql = SQL(
        """
        SELECT column_name
        FROM information_schema.columns
        WHERE table_schema = {schema}
        AND table_name = {table}
        ORDER BY ordinal_position;
    """,
    ).format(schema=Literal(self.schema), table=Literal(self.table_or_view))
    return [c["column_name"] for c in self.db.rowdict(sql)[0]]

get_column_dtype

get_column_dtype(column: str) -> str

Return the data type of a column.

Source code in pg_summary/pg_summary.py
def get_column_dtype(self: PgSummary, column: str) -> str:
    """Return the data type of a column."""
    if not self._table_column_exists(column):
        raise ValueError(
            f"Column {column} does not exist in {self.schema}.{self.table_or_view}.",
        )
    sql = SQL(
        """
        SELECT
            column_name,
            case
                when character_maximum_length is null
                then data_type
                else data_type || '(' || character_maximum_length || ')'
                end as data_type
        FROM information_schema.columns
        WHERE table_schema = {schema}
        AND table_name = {table}
        AND column_name = {column};
    """,
    ).format(
        schema=Literal(self.schema),
        table=Literal(self.table_or_view),
        column=Literal(column),
    )
    data = self.db.rowdict(sql)
    if data[-1] == 0:
        raise ValueError(
            f"Column {column} does not exist in {self.schema}.{self.table_or_view}.",
        )
    return next(data[0])["data_type"]

get_unique_column_values

get_unique_column_values(column: str) -> tuple

Return a list of unique values for a column.

Parameters:

Name Type Description Default
column str

Name of the column to query for unique values.

required

Returns:

Name Type Description
tuple tuple

A tuple containing a list of unique values and the number of unique values.

Source code in pg_summary/pg_summary.py
def get_unique_column_values(self: PgSummary, column: str) -> tuple:
    """Return a list of unique values for a column.

    Args:
        column (str): Name of the column to query for unique values.

    Returns:
        tuple: A tuple containing a list of unique values and the number of unique values.
    """
    if not self._table_column_exists(column):
        raise ValueError(
            f"Column {column} does not exist in {self.schema}.{self.table_or_view}.",
        )
    sql = SQL(
        """
        SELECT DISTINCT {column}
        FROM {schema}.{table}
        WHERE {column} IS NOT NULL
        ORDER BY {column};
    """,
    ).format(
        column=Identifier(column),
        schema=Identifier(self.schema),
        table=Identifier(self.table_or_view),
    )
    return self.db.rowdict(sql)

get_null_column_value_count

get_null_column_value_count(column: str) -> int

Return the number of null values for a column.

Parameters:

Name Type Description Default
column str

Name of the column to query for null values.

required

Returns:

Name Type Description
int int

The number of null values for the column.

Source code in pg_summary/pg_summary.py
def get_null_column_value_count(self: PgSummary, column: str) -> int:
    """Return the number of null values for a column.

    Args:
        column (str): Name of the column to query for null values.

    Returns:
        int: The number of null values for the column.
    """
    if not self._table_column_exists(column):
        raise ValueError(
            f"Column {column} does not exist in {self.schema}.{self.table_or_view}.",
        )
    sql = SQL(
        """
        SELECT COUNT(*)
        FROM {schema}.{table}
        WHERE {column} IS NULL;
    """,
    ).format(
        column=Identifier(column),
        schema=Identifier(self.schema),
        table=Identifier(self.table_or_view),
    )
    return next(self.db.rowdict(sql)[0])["count"]

get_table_row_count

get_table_row_count() -> int

Return the number of rows in a table.

Returns:

Name Type Description
int int

The number of rows in the table.

Source code in pg_summary/pg_summary.py
def get_table_row_count(self: PgSummary) -> int:
    """Return the number of rows in a table.

    Returns:
        int: The number of rows in the table.
    """
    sql = SQL(
        """
        SELECT COUNT(*)
        FROM {schema}.{table};
    """,
    ).format(
        schema=Identifier(self.schema),
        table=Identifier(self.table_or_view),
    )
    return next(self.db.rowdict(sql)[0])["count"]

summary

summary() -> None

Create a summary of unique values for each column in a Postgres table or view and summarize results in an Excel workbook.

Source code in pg_summary/pg_summary.py
def summary(self: PgSummary) -> None:
    """Create a summary of unique values for each column in a Postgres table or view and summarize results in an Excel workbook."""  # noqa: E501
    logger.info(f"Creating summary table for {self.schema}.{self.table_or_view}")
    wb = openpyxl.Workbook()
    sheet = wb.active
    if not sheet:
        sheet = wb.create_sheet()
    sheet.title = f"{self.__class__.__name__}"

    # Summary header
    sheet.cell(row=1, column=2).value = "Host"
    sheet.cell(row=1, column=2).font = FONT
    sheet.cell(row=2, column=2).value = self.db.host

    sheet.cell(row=1, column=3).value = "Database"
    sheet.cell(row=1, column=3).font = FONT
    sheet.cell(row=2, column=3).value = self.db.database

    sheet.cell(row=1, column=4).value = "Schema"
    sheet.cell(row=1, column=4).font = FONT
    sheet.cell(row=2, column=4).value = self.schema

    sheet.cell(row=1, column=5).value = "Table/View"
    sheet.cell(row=1, column=5).font = FONT
    sheet.cell(row=2, column=5).value = self.table_or_view

    sheet.cell(row=1, column=6).value = "Total Rows"
    sheet.cell(row=1, column=6).font = FONT
    sheet.cell(row=2, column=6).value = self.get_table_row_count()

    # Summary results
    for i, col in enumerate(self.get_column_names()):
        if i == 0:
            sheet.cell(row=4, column=i + 1).value = "# of unique values"
            sheet.cell(row=4, column=i + 1).font = FONT
            sheet.cell(row=5, column=i + 1).value = "# of null values"
            sheet.cell(row=5, column=i + 1).font = FONT
            sheet.cell(row=6, column=i + 1).value = "data type"
            sheet.cell(row=6, column=i + 1).font = FONT
            sheet.cell(row=7, column=i + 1).value = "column name"
            sheet.cell(row=7, column=i + 1).font = FONT
        # Number of unique values
        sheet.cell(row=4, column=i + 2).value = self.get_unique_column_values(col)[-1]
        # Number of null values
        sheet.cell(row=5, column=i + 2).value = self.get_null_column_value_count(
            col,
        )
        # Column data type
        sheet.cell(row=6, column=i + 2).value = self.get_column_dtype(col)
        # Column name
        sheet.cell(row=7, column=i + 2).value = col
        # Styling
        sheet.cell(row=7, column=i + 2).font = FONT
        sheet.cell(row=7, column=i + 2).border = BORDER
        sheet.cell(row=7, column=i + 2).fill = FILL
        # Unique column values
        for j, val in enumerate(self.get_unique_column_values(col)[0]):
            sheet.cell(row=j + 8, column=i + 2).value = val[col]
        # Turn on filtering for the column row
        sheet.auto_filter.ref = f"B7:{get_column_letter(i+2)}{j+8}"
        # Column A cell width
        # sheet.column_dimensions["A"].width = 20
    # Auto size all columns
    for col in sheet.columns:
        length = max(len(str(cell.value)) for cell in col)
        sheet.column_dimensions[get_column_letter(col[0].column)].width = length
    # Write the source table or view to a new sheet
    if self.include_src:
        logger.debug(
            f"Writing source table or view to sheet {self.schema}.{self.table_or_view}",
        )
        sheet = wb.create_sheet(title=f"{self.schema}.{self.table_or_view}")
        sql = SQL("SELECT * FROM {schema}.{table};").format(
            schema=Identifier(self.schema),
            table=Identifier(self.table_or_view),
        )
        data, headers, rowcount = self.db.rowdict(sql)
        for i, header in enumerate(headers):
            sheet.cell(row=1, column=i + 1).value = header
            sheet.cell(row=1, column=i + 1).font = FONT
            sheet.cell(row=1, column=i + 1).border = BORDER
            sheet.cell(row=1, column=i + 1).fill = FILL
        for i, row in enumerate(data):
            for j, val in enumerate(row.values()):
                sheet.cell(row=i + 2, column=j + 1).value = val
        # Auto size all columns
        for col in sheet.columns:
            length = max(len(str(cell.value)) for cell in col)
            sheet.column_dimensions[get_column_letter(col[0].column)].width = length
        # Turn on filtering for the column row
        sheet.auto_filter.ref = f"A1:{get_column_letter(len(headers))}1"
    wb.save(self.outfile)
    logger.info(f"Summary complete. Saving output to {self.outfile}")