Skip to content

Waterfall

This part of the project documentation focuses on a task-oriented approach. Use it as a guide to accomplish any of the Waterfall class methods below.

Count

Count columns via the columns parameter in the __init__ of the PandasWaterfall or SparkWaterfall class. An example is provided below.

from waterfall_logging.log import PandasWaterfall, SparkWaterfall

pandas_w = PandasWaterfall(columns=['user_id'])

spark_w = SparkWaterfall(columns=['user_id'])

Parameters:

Name Type Description Default
table_name str

Specifies the name of the table to log

None
columns Lists[str]

Specifies which columns to log

None
distinct_columns List[str]

Specifies which distinct column values to log

None
delta_prefix str

Prefix for column names with discrete difference (delta) with previous row

'Δ '
dropna bool

Whether to exclude NaN in the row counts

False
row_count_column str

Column name for an added column that counts rows in table

'Rows'
Source code in waterfall_logging/log.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def __init__(
    self,
    table_name: str | None = None,
    columns: List[str] | None = None,
    distinct_columns: List[str] | None = None,
    dropna: bool = False,
    delta_prefix: str = "Δ ",
    row_count_column: str = "Rows",
):
    """

    Args:
        table_name (str): Specifies the name of the table to log
        columns (Lists[str]): Specifies which columns to log
        distinct_columns (List[str]): Specifies which distinct column values to log
        delta_prefix (str): Prefix for column names with discrete difference (delta) with previous row
        dropna (bool): Whether to exclude NaN in the row counts
        row_count_column (str): Column name for an added column that counts rows in table

    """
    self.table_name = table_name
    self.columns = iterable_to_list(columns)
    self.distinct_columns = iterable_to_list(distinct_columns)
    self.delta_prefix = delta_prefix
    self.dropna = dropna
    self.row_count_column = row_count_column

    self._input_columns = self.columns + self.distinct_columns
    self._all_columns = self._input_columns.copy()
    if self.row_count_column:
        self._all_columns.append(self.row_count_column)
    self._static_columns = ["Table", "Reason", "Configurations flag"]
    self._log = None

    distinct_overwrite = set(self.columns).intersection(set(self.distinct_columns))
    if distinct_overwrite:
        warnings.warn(
            f"Column names in `distinct_columns` overwrite names in `columns` with distinct counts: "
            f"{distinct_overwrite}.",
        )

Count distinct

Count distinct columns via the distinct_columns parameter in the __init__ of the PandasWaterfall or SparkWaterfall class. An example is provided below.

from waterfall_logging.log import PandasWaterfall, SparkWaterfall

pandas_w = PandasWaterfall(distinct_columns=['user_id'])

spark_w = SparkWaterfall(distinct_columns=['user_id'])

Note

Column names in distinct_columns overwrite names in columns with distinct counts.

Parameters:

Name Type Description Default
table_name str

Specifies the name of the table to log

None
columns Lists[str]

Specifies which columns to log

None
distinct_columns List[str]

Specifies which distinct column values to log

None
delta_prefix str

Prefix for column names with discrete difference (delta) with previous row

'Δ '
dropna bool

Whether to exclude NaN in the row counts

False
row_count_column str

Column name for an added column that counts rows in table

'Rows'
Source code in waterfall_logging/log.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def __init__(
    self,
    table_name: str | None = None,
    columns: List[str] | None = None,
    distinct_columns: List[str] | None = None,
    dropna: bool = False,
    delta_prefix: str = "Δ ",
    row_count_column: str = "Rows",
):
    """

    Args:
        table_name (str): Specifies the name of the table to log
        columns (Lists[str]): Specifies which columns to log
        distinct_columns (List[str]): Specifies which distinct column values to log
        delta_prefix (str): Prefix for column names with discrete difference (delta) with previous row
        dropna (bool): Whether to exclude NaN in the row counts
        row_count_column (str): Column name for an added column that counts rows in table

    """
    self.table_name = table_name
    self.columns = iterable_to_list(columns)
    self.distinct_columns = iterable_to_list(distinct_columns)
    self.delta_prefix = delta_prefix
    self.dropna = dropna
    self.row_count_column = row_count_column

    self._input_columns = self.columns + self.distinct_columns
    self._all_columns = self._input_columns.copy()
    if self.row_count_column:
        self._all_columns.append(self.row_count_column)
    self._static_columns = ["Table", "Reason", "Configurations flag"]
    self._log = None

    distinct_overwrite = set(self.columns).intersection(set(self.distinct_columns))
    if distinct_overwrite:
        warnings.warn(
            f"Column names in `distinct_columns` overwrite names in `columns` with distinct counts: "
            f"{distinct_overwrite}.",
        )

Drop NaN

Drop NaN values in the (distinct) counts of your Pandas or Spark DataFrame.

from waterfall_logging.log import PandasWaterfall, SparkWaterfall

pandas_w = PandasWaterfall(columns=['A'], dropna=True)

spark_w = SparkWaterfall(columns=['A'], dropna=True)

Parameters:

Name Type Description Default
table_name str

Specifies the name of the table to log

None
columns Lists[str]

Specifies which columns to log

None
distinct_columns List[str]

Specifies which distinct column values to log

None
delta_prefix str

Prefix for column names with discrete difference (delta) with previous row

'Δ '
dropna bool

Whether to exclude NaN in the row counts

False
row_count_column str

Column name for an added column that counts rows in table

'Rows'
Source code in waterfall_logging/log.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def __init__(
    self,
    table_name: str | None = None,
    columns: List[str] | None = None,
    distinct_columns: List[str] | None = None,
    dropna: bool = False,
    delta_prefix: str = "Δ ",
    row_count_column: str = "Rows",
):
    """

    Args:
        table_name (str): Specifies the name of the table to log
        columns (Lists[str]): Specifies which columns to log
        distinct_columns (List[str]): Specifies which distinct column values to log
        delta_prefix (str): Prefix for column names with discrete difference (delta) with previous row
        dropna (bool): Whether to exclude NaN in the row counts
        row_count_column (str): Column name for an added column that counts rows in table

    """
    self.table_name = table_name
    self.columns = iterable_to_list(columns)
    self.distinct_columns = iterable_to_list(distinct_columns)
    self.delta_prefix = delta_prefix
    self.dropna = dropna
    self.row_count_column = row_count_column

    self._input_columns = self.columns + self.distinct_columns
    self._all_columns = self._input_columns.copy()
    if self.row_count_column:
        self._all_columns.append(self.row_count_column)
    self._static_columns = ["Table", "Reason", "Configurations flag"]
    self._log = None

    distinct_overwrite = set(self.columns).intersection(set(self.distinct_columns))
    if distinct_overwrite:
        warnings.warn(
            f"Column names in `distinct_columns` overwrite names in `columns` with distinct counts: "
            f"{distinct_overwrite}.",
        )

Log Waterfall step

Logs a Pandas or Spark DataFrame.

import pandas as pd
import pyspark

from waterfall_logging.log import PandasWaterfall, SparkWaterfall

df = pd.DataFrame(data={'A': [0,1,2,3,4,5], 'B': ['id1', 'id2', 'id3', 'id4', 'id4']})

pandas_w = PandasWaterfall(columns=['A'], distinct_columns=['B'])
pandas_w.log(table=df, reason='example', configuration_flag='initial')

spark = pyspark.sql.SparkSession.builder.enableHiveSupport().appName(str(__file__)).getOrCreate()
sdf = spark.createDataFrame(df)

spark_w = SparkWaterfall(columns=['A'], distinct_columns=['B'])
spark_w.log(table=sdf, reason='example', configuration_flag='initial')

Logs table (distinct) counts to logging DataFrame.

Parameters:

Name Type Description Default
table pd.DataFrame

DataFrame that the filtering is applied to

required
reason str

Specifies reasoning for DataFrame filtering step

None
configuration_flag str

Specifies configurations flag used for DataFrame filtering step

None
table_name str

First column in table is the table id column that should contain the table name value

None

Examples:

>>> waterfall = PandasWaterfall()
>>> waterfall.log(table, reason='Filtered in-scope bicycles', configuration_flag='inscope=True',
... table_name='sample_table')

Returns:

Type Description
None

None

Source code in waterfall_logging/log.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def log(
    self,
    table: pd.DataFrame | pyspark.sql.DataFrame,
    reason: str | None = None,
    configuration_flag: str | None = None,
    table_name: str | None = None,
) -> None:
    """Logs table (distinct) counts to logging DataFrame.

    Args:
        table (pd.DataFrame): DataFrame that the filtering is applied to
        reason (str): Specifies reasoning for DataFrame filtering step
        configuration_flag (str): Specifies configurations flag used for DataFrame filtering step
        table_name (str): First column in table is the `table id` column that should contain the table name value

    Examples:
        >>> waterfall = PandasWaterfall()
        >>> waterfall.log(table, reason='Filtered in-scope bicycles', configuration_flag='inscope=True',
        ... table_name='sample_table')

    Returns:
        None

    """
    table_name = table_name or self.table_name

    if self._log is None:
        self._log = self._create_log()

    current_table = self._log.loc[self._log.iloc[:, 0] == table_name]
    entries = self._count_entries(table)

    prior_entries = (
        entries.copy() if current_table.empty else current_table[self._all_columns].iloc[-1, :].to_list()
    )

    column_entries = []
    for entry, prior_entry in zip(entries, prior_entries):
        column_entries += [entry, entry - prior_entry]

    calculated_columns = [table_name] + column_entries + [reason, configuration_flag]

    self._log.loc[len(self._log)] = calculated_columns

Write to Markdown file

Write Waterfall logging tables to Markdown files.

Under the hood uses pandas.to_markdown. All arguments that can be provided for pandas.to_markdown() function, can also be used in this function.

Print DataFrame in Markdown-friendly format.

Parameters:

Name Type Description Default
index bool

Add index (row) labels

False

Examples:

>>> waterfall = PandasWaterfall()
>>> print(waterfall.to_markdown(index=True))
|    | Table  |   col1 |   Δ col1 |   Rows |   Δ Rows | Reason  | Configurations flag |
|---:|:-------|-------:|---------:|-------:|---------:|:--------|:--------------------|
|  0 | table1 |     50 |        0 |   2727 |        0 | example | to_markdown         |
|  1 | table1 |    150 |      100 |   2827 |        0 | example | to_markdown         |
>>> print(waterfall.to_markdown(index=False))
| Table  |   col1 |   Δ col1 |   Rows |   Δ Rows | Reason  | Configurations flag |
|:-------|-------:|---------:|-------:|---------:|:--------|:--------------------|
| table1 |     50 |        0 |   2727 |        0 | example | to_markdown         |
| table1 |    150 |      100 |   2827 |        0 | example | to_markdown         |

Returns:

Type Description
str

DataFrame in Markdown-friendly format

Source code in waterfall_logging/log.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
def to_markdown(self, *args, index=False, **kwargs):
    """Print DataFrame in Markdown-friendly format.

    Args:
        index (bool): Add index (row) labels

    Examples:
        >>> waterfall = PandasWaterfall()
        >>> print(waterfall.to_markdown(index=True))
        |    | Table  |   col1 |   Δ col1 |   Rows |   Δ Rows | Reason  | Configurations flag |
        |---:|:-------|-------:|---------:|-------:|---------:|:--------|:--------------------|
        |  0 | table1 |     50 |        0 |   2727 |        0 | example | to_markdown         |
        |  1 | table1 |    150 |      100 |   2827 |        0 | example | to_markdown         |
        >>> print(waterfall.to_markdown(index=False))
        | Table  |   col1 |   Δ col1 |   Rows |   Δ Rows | Reason  | Configurations flag |
        |:-------|-------:|---------:|-------:|---------:|:--------|:--------------------|
        | table1 |     50 |        0 |   2727 |        0 | example | to_markdown         |
        | table1 |    150 |      100 |   2827 |        0 | example | to_markdown         |

    Returns:
        (str): DataFrame in Markdown-friendly format

    """
    return self._log.to_markdown(*args, index=index, **kwargs)

Read from Markdown file

Read Waterfall logging tables from Markdown files.

Under the hood uses pandas.read_table. All arguments that can be provided for pandas.read_table(), can also be used in this function.

Some useful arguments:

  • The seperator in our markdown file is a pipe: sep=|.
  • The header with column names start at row index: header=0.
  • The markdown file does not contain an index column: index_col=False.
  • The row that separates the headers from the values is on the first row: skiprows=[1].
  • Markdowns column names are preceded with initial spaces, to be removed when reading: skipinitialspace=True.

Reads table from Markdown file.

Parameters:

Name Type Description Default
filepath_or_buffer str, path object or file-like object

Any valid string path is acceptable. The string could be a URL. Valid URL schemes include http, ftp, s3, gs, and file. For file URLs, a host is expected. A local file could be: file://localhost/path/to/table.csv. If you want to pass in a path object, pandas accepts any os.PathLike. By file-like object, we refer to objects with a read() method, such as a file handle (e.g. via builtin open function) or StringIO.

required
columns Lists[str]

Specifies which column to read in and log

None

Examples:

>>> f = open('output/tests/read_markdown_table.md', 'r')
>>> print(f.read())
| Table  |   col1 |   Δ col1 |   Rows |   Δ Rows | Reason  | Configurations flag   |
|:-------|-------:|---------:|-------:|---------:|:--------|:----------------------|
| table1 |     50 |        0 |   2727 |        0 | initial | read_markdown         |
| table1 |    150 |      100 |   2827 |        0 | add-on  | read_markdown         |
| table1 |    250 |      100 |   2927 |      100 | extra   | read_markdown         |
>>> waterfall = PandasWaterfall()
>>> waterfall.read_markdown(
...     filepath_or_buffer='output/tests/read_markdown_table.md',
....    sep='|', header=0, index_col=False, skiprows=[1], skipinitialspace=True
... )
>>> print(waterfall._log)
    Table  col1  Δ col1  Rows  Δ Rows   Reason Configurations flag
0  table1    50       0  2727       0  example       read_markdown
1  table1   150     100  2827       0  add-on        read_markdown
1  table1   250     100  2927       0  extra         read_markdown
>>> print(type(waterfall._log))
<class 'pandas.core.frame.DataFrame'>

Returns:

Type Description
None

None

Source code in waterfall_logging/log.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def read_markdown(
    self,
    filepath_or_buffer: FilePath | ReadCsvBuffer[bytes] | ReadCsvBuffer[str],
    columns: List[str] | None = None,
    *args,
    **kwargs,
) -> None:
    """Reads table from Markdown file.

    Args:
        filepath_or_buffer (str, path object or file-like object):
            Any valid string path is acceptable. The string could be a URL. Valid
            URL schemes include http, ftp, s3, gs, and file. For file URLs, a host is
            expected. A local file could be: file://localhost/path/to/table.csv.
            If you want to pass in a path object, pandas accepts any ``os.PathLike``.
            By file-like object, we refer to objects with a ``read()`` method, such as
            a file handle (e.g. via builtin ``open`` function) or ``StringIO``.
        columns (Lists[str]): Specifies which column to read in and log

    Examples:
        >>> f = open('output/tests/read_markdown_table.md', 'r')
        >>> print(f.read())
        | Table  |   col1 |   Δ col1 |   Rows |   Δ Rows | Reason  | Configurations flag   |
        |:-------|-------:|---------:|-------:|---------:|:--------|:----------------------|
        | table1 |     50 |        0 |   2727 |        0 | initial | read_markdown         |
        | table1 |    150 |      100 |   2827 |        0 | add-on  | read_markdown         |
        | table1 |    250 |      100 |   2927 |      100 | extra   | read_markdown         |
        >>> waterfall = PandasWaterfall()
        >>> waterfall.read_markdown(
        ...     filepath_or_buffer='output/tests/read_markdown_table.md',
        ....    sep='|', header=0, index_col=False, skiprows=[1], skipinitialspace=True
        ... )
        >>> print(waterfall._log)
            Table  col1  Δ col1  Rows  Δ Rows   Reason Configurations flag
        0  table1    50       0  2727       0  example       read_markdown
        1  table1   150     100  2827       0  add-on        read_markdown
        1  table1   250     100  2927       0  extra         read_markdown
        >>> print(type(waterfall._log))
        <class 'pandas.core.frame.DataFrame'>

    Returns:
        None

    """
    self._log = (
        pd.read_table(filepath_or_buffer, *args, **kwargs)
        # strips trailing whitespaces in column names
        # drops columns with all NA values
        .rename(str.rstrip, axis="columns").dropna(axis=1, how="all")
        # strips trailing whitespaces in column values
        .apply(lambda row: row.str.rstrip() if row.dtype == object else row)
    )
    self._log = self._log[columns] if columns else self._log

Plot Waterfall

Plots Waterfall logging tables in a chart.

Under the hood plots uses a Plotly Waterfall chart plotly.graph_objects.Waterfall.

Note

Make sure to use an unique reason arguments for each logging step, otherwise your plot will look strange!

Plots a logging DataFrame column in a waterfall chart.

Parameters:

Name Type Description Default
y_col str

Specifies column that contains absolute value counts for y-axis of plot

'Rows'
y_col_delta str

Specifies column that contains delta value counts for y-axis of plot

'Δ Rows'
x_col str

Specifies column that contains the filtering explanation for x-axis of plot

'Reason'
drop_zero_delta bool

Whether to remove rows for y_col_delta that contain zeros

False

Examples:

>>> waterfall = PandasWaterfall()
>>> fig = waterfall.plot(y_col='Rows', y_col_delta='Δ Rows', x_col='Reason',
... textfont=dict(family='sans-serif', size=11),
... connector={'line': {'color': 'rgba(0,0,0,0)'}},
... totals={'marker': {'color': '#dee2e6', 'line': {'color': '#dee2e6', 'width': 1}}}
... )

Returns:

Type Description
go.Figure

go.Figure: waterfall chart

Source code in waterfall_logging/log.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def plot(
    self,
    *args,
    y_col: str = "Rows",
    y_col_delta: str = "Δ Rows",
    x_col: str = "Reason",
    drop_zero_delta: bool = False,
    **kwargs,
) -> go.Figure:
    """Plots a logging DataFrame column in a waterfall chart.

    Args:
        y_col (str): Specifies column that contains absolute value counts for y-axis of plot
        y_col_delta (str): Specifies column that contains delta value counts for y-axis of plot
        x_col (str): Specifies column that contains the filtering explanation for x-axis of plot
        drop_zero_delta (bool): Whether to remove rows for `y_col_delta` that contain zeros

    Examples:
        >>> waterfall = PandasWaterfall()
        >>> fig = waterfall.plot(y_col='Rows', y_col_delta='Δ Rows', x_col='Reason',
        ... textfont=dict(family='sans-serif', size=11),
        ... connector={'line': {'color': 'rgba(0,0,0,0)'}},
        ... totals={'marker': {'color': '#dee2e6', 'line': {'color': '#dee2e6', 'width': 1}}}
        ... )

    Returns:
        go.Figure: waterfall chart

    """
    df = self._log.copy()
    if drop_zero_delta:
        df = df.iloc[1:]
        indices = df[(df[y_col_delta] == 0)].index
        df = df.drop(indices, inplace=False)

    measure = ["absolute"] + ["relative"] * (df.shape[0] - 1) + ["total"]
    x = df[x_col].to_list() + ["Total"]
    y = [df.loc[df.index[0], y_col]] + [x for x in df[y_col_delta][1:]] + [df.loc[df.index[-1], y_col]]

    return go.Figure(
        go.Waterfall(
            *args,
            measure=measure,
            x=x,
            y=y,
            text=y,
            **kwargs,
        ),
    )

See below for a more in-depth example, including fig.update_layout and fig.update_traces.

from waterfall_logging.log import PandasWaterfall

waterfall_log = PandasWaterfall()
waterfall_log.read_markdown(filepath_or_buffer='output/tests/read_markdown_table.md',
    sep='|', header=0, index_col=False, skiprows=[1], skipinitialspace=True
)

fig = waterfall_log.plot(y_col='Rows',
    textfont=dict(family='sans-serif', size=11),
    connector={'line': {'color': 'rgba(0,0,0,0)'}},
    totals={'marker': {'color': '#dee2e6', 'line': {'color': '#dee2e6', 'width': 1}}}
)

fig.update_layout(
    autosize=True,
    width=1000,
    height=1000,
    title=f'Data filtering steps',
    xaxis=dict(title='Filtering steps'),
    yaxis=dict(title='# of entries'),
    showlegend=False,
    waterfallgroupgap=0.1,
)

fig.update_traces(
    textposition='outside',
)

fig.write_image('output/tests/read_markdown_table.png')
image