Examples

API Usage Examples

Backend Setup

 1"""
 2Shows the two ways of initializing a Parfun backend.
 3
 4Usage:
 5
 6    $ git clone https://github.com/Citi/parfun && cd parfun
 7    $ python -m examples.api_usage.backend_setup
 8"""
 9
10import parfun as pf
11
12
13if __name__ == "__main__":
14    # Set the parallel backend process-wise.
15    pf.set_parallel_backend("local_multiprocessing")
16
17    # Set the parallel backend with a Python context.
18    with pf.set_parallel_backend_context("scaler_remote", scheduler_address="tcp://scaler.cluster:1243"):
19        ...  # Will run the parallel tasks over a remotely setup Scaler cluster.

Partitioning API

all_arguments

 1"""
 2Uses `all_arguments` to partition all the input data of a parallel function.
 3
 4Usage:
 5
 6    $ git clone https://github.com/Citi/parfun && cd parfun
 7    $ python -m examples.api_usage.all_arguments
 8"""
 9
10import pandas as pd
11
12import parfun as pf
13
14
15@pf.parallel(
16    split=pf.all_arguments(pf.dataframe.by_group(by=["year", "month"])),
17    combine_with=pf.dataframe.concat,
18)
19def monthly_sum(sales: pd.DataFrame, costs: pd.DataFrame) -> pd.DataFrame:
20    merged = pd.merge(sales, costs, on=["year", "month", "day"], how="outer")
21    # Group and sum by day
22    grouped = merged.groupby(["year", "month", "day"], as_index=False).sum(numeric_only=True)
23
24    return grouped
25
26
27if __name__ == "__main__":
28    sales = pd.DataFrame({
29        "year": [2024, 2024, 2024],
30        "month": [1, 1, 2],
31        "day": [1, 2, 1],
32        "sales": [100, 200, 150]
33    })
34
35    costs = pd.DataFrame({
36        "year": [2024, 2024, 2024],
37        "month": [1, 1, 2],
38        "day": [1, 2, 1],
39        "costs": [50, 70, 80]
40    })
41
42    with pf.set_parallel_backend_context("local_multiprocessing"):
43        result = monthly_sum(sales, costs)
44
45    print(result)
46    #     year  month  day  sales  costs
47    # 0  2024      1    1    100     50
48    # 1  2024      1    2    200     70
49    # 2  2024      2    1    150     80

per_argument

 1"""
 2Uses `per_argument` to partition the input data from multiple arguments.
 3
 4Usage:
 5
 6    $ git clone https://github.com/Citi/parfun && cd parfun
 7    $ python -m examples.api_usage.per_argument
 8"""
 9
10from typing import List
11
12import pandas as pd
13
14import parfun as pf
15
16
17@pf.parallel(
18    split=pf.per_argument(
19        factors=pf.py_list.by_chunk,
20        dataframe=pf.dataframe.by_row,
21    ),
22    combine_with=pf.dataframe.concat,
23)
24def multiply_by_row(factors: List[int], dataframe: pd.DataFrame) -> pd.DataFrame:
25    assert len(factors) == len(dataframe)
26    return dataframe.multiply(factors, axis=0)
27
28
29if __name__ == "__main__":
30    dataframe = pd.DataFrame({
31        "A": [1, 2, 3],
32        "B": [4, 5, 6]
33    })
34
35    factors = [10, 20, 30]
36
37    with pf.set_parallel_backend_context("local_multiprocessing"):
38        result = multiply_by_row(factors, dataframe)
39
40    print(result)
41    #     A    B
42    # 0  10   40
43    # 1  40  100
44    # 2  90  180

Custom Partition Function

 1"""
 2Shows how to use custom Python generators and functions as partitioning and combining functions.
 3
 4Usage:
 5
 6    $ git clone https://github.com/Citi/parfun && cd parfun
 7    $ python -m examples.api_usage.custom_generators
 8"""
 9
10from typing import Generator, Iterable, Tuple
11
12import pandas as pd
13
14import parfun as pf
15
16
17def partition_by_day_of_week(dataframe: pd.DataFrame) -> Generator[Tuple[pd.DataFrame], None, None]:
18    """Divides the computation on the "datetime" value, by day of the week (Monday, Tuesday ...)."""
19
20    for _, partition in dataframe.groupby(dataframe["datetime"].dt.day_of_week):
21        yield partition,  # Should always yield a tuple that matches the input parameters.
22
23
24def combine_results(dataframes: Iterable[pd.DataFrame]) -> pd.DataFrame:
25    """Collects the results by concatenating them, and make sure the values are kept sorted by date."""
26    return pd.concat(dataframes).sort_values(by="datetime")
27
28
29@pf.parallel(
30    split=pf.all_arguments(partition_by_day_of_week),
31    combine_with=combine_results,
32)
33def daily_mean(dataframe: pd.DataFrame) -> pd.DataFrame:
34    return dataframe.groupby(dataframe["datetime"].dt.date).mean(numeric_only=True)
35
36
37if __name__ == "__main__":
38    dataframe = pd.DataFrame({
39        # Probing times
40        "datetime": pd.to_datetime([
41            "2025-04-01 06:00", "2025-04-01 18:00", "2025-04-02 10:00", "2025-04-03 14:00", "2025-04-03 23:00",
42            "2025-04-04 08:00", "2025-04-05 12:00", "2025-04-06 07:00", "2025-04-06 20:00", "2025-04-07 09:00",
43            "2025-04-08 15:00", "2025-04-09 11:00", "2025-04-10 13:00", "2025-04-11 06:00", "2025-04-12 16:00",
44            "2025-04-13 17:00", "2025-04-14 22:00", "2025-04-15 10:00", "2025-04-16 09:00", "2025-04-17 13:00",
45            "2025-04-18 14:00", "2025-04-19 18:00", "2025-04-20 07:00", "2025-04-21 20:00", "2025-04-22 15:00",
46        ]),
47        # Temperature values (°C)
48        "temperature": [
49            7.2, 10.1, 9.8, 12.5, 11.7,
50            8.9, 13.0, 7.5, 10.8, 9.3,
51            12.1, 11.5, 13.3, 6.8, 12.7,
52            13.5, 9.2, 10.0, 9.9, 11.8,
53            12.4, 10.6, 7.9, 9.5, 11.6,
54        ],
55        # Humidity values (%)
56        "humidity": [
57            85, 78, 80, 75, 76,
58            88, 73, 89, 77, 84,
59            72, 74, 70, 90, 71,
60            69, 86, 81, 83, 76,
61            74, 79, 87, 82, 73,
62        ]
63    })
64
65    with pf.set_parallel_backend_context("local_multiprocessing"):
66        result = daily_mean(dataframe)
67
68    print(result)

Enforced Partition Size

 1"""
 2Uses `all_arguments` to partition all the input data of a parallel function.
 3
 4Usage:
 5
 6    $ git clone https://github.com/Citi/parfun && cd parfun
 7    $ python -m examples.api_usage.partition_size
 8"""
 9
10import numpy as np
11import pandas as pd
12
13import parfun as pf
14
15
16# With `fixed_partition_size`, the input dataframe will always be split in chunks of 1000 rows.
17@pf.parallel(
18    split=pf.all_arguments(pf.dataframe.by_row),
19    combine_with=sum,
20    fixed_partition_size=1000,
21)
22def fixed_partition_size_sum(dataframe: pd.DataFrame) -> float:
23    return dataframe.values.sum()
24
25
26# With `initial_partition_size`, the input dataframe will be split in chunks of 1000 rows until Parfun's
27# machine-learning algorithm find a better estimate.
28@pf.parallel(
29    split=pf.all_arguments(pf.dataframe.by_row),
30    combine_with=sum,
31    initial_partition_size=1000,
32)
33def initial_partition_size_sum(dataframe: pd.DataFrame) -> float:
34    return dataframe.values.sum()
35
36
37# Both `fixed_partition_size` and `initial_partition_size` can accept a callable instead of an integer value. This
38# allows for partition sizes to be computed based on the input parameters.
39@pf.parallel(
40    split=pf.all_arguments(pf.dataframe.by_row),
41    combine_with=sum,
42    initial_partition_size=lambda dataframe: max(10, len(dataframe) // 4),
43)
44def computed_partition_size_sum(dataframe: pd.DataFrame) -> float:
45    return dataframe.values.sum()
46
47
48if __name__ == "__main__":
49    dataframe = pd.DataFrame(
50        np.random.randint(0, 100, size=(100, 3)),
51        columns=["alpha", "beta", "gamma"],
52    )
53
54    with pf.set_parallel_backend_context("local_multiprocessing"):
55        print(fixed_partition_size_sum(dataframe))
56        print(initial_partition_size_sum(dataframe))
57        print(computed_partition_size_sum(dataframe))

Profiling

 1"""
 2Demonstrates the use of the `profile` and `trace_export` parameters for profiling Parfun's performances.
 3
 4Usage:
 5
 6    $ git clone https://github.com/Citi/parfun && cd parfun
 7    $ python -m examples.api_usage.profiling
 8"""
 9
10from typing import List
11import random
12
13import parfun as pf
14
15
16@pf.parallel(
17    split=pf.all_arguments(pf.py_list.by_chunk),
18    combine_with=sum,
19    profile=True,
20    trace_export="parallel_sum_trace.csv",
21)
22def parallel_sum(values: List) -> List:
23    return sum(values)
24
25
26if __name__ == "__main__":
27    N_VALUES = 100_000
28    values = [random.randint(0, 99) for _ in range(0, N_VALUES)]
29
30    with pf.set_parallel_backend_context("local_multiprocessing"):
31        print("Sum =", parallel_sum(values))

Nested Parfun Calls

 1"""
 2Shows how a Parfun function can be called from within another Parfun function.
 3
 4Usage:
 5
 6    $ git clone https://github.com/Citi/parfun && cd parfun
 7    $ python -m examples.api_usage.nested_functions
 8"""
 9
10import pprint
11import random
12from typing import List
13
14import parfun as pf
15
16
17@pf.parallel(
18    split=pf.all_arguments(pf.py_list.by_chunk),
19    combine_with=pf.py_list.concat,
20)
21def add_vectors(vec_a: List, vec_b: List) -> List:
22    """Add two vectors, element-wise."""
23    return [a + b for a, b in zip(vec_a, vec_b)]
24
25
26@pf.parallel(
27    split=pf.all_arguments(pf.py_list.by_chunk),
28    combine_with=pf.py_list.concat,
29)
30def add_matrices(mat_a: List[List], mat_b: List[List]) -> List[List]:
31    """Add two matrices, row by row."""
32    return [add_vectors(vec_a, vec_b) for vec_a, vec_b in zip(mat_a, mat_b)]
33
34
35if __name__ == "__main__":
36    N_ROWS, N_COLS = 10, 10
37
38    mat_a = [[random.randint(0, 99) for _ in range(0, N_COLS)] for _ in range(0, N_ROWS)]
39    mat_b = [[random.randint(0, 99) for _ in range(0, N_COLS)] for _ in range(0, N_ROWS)]
40
41    print("A =")
42    pprint.pprint(mat_a)
43
44    print("B =")
45    pprint.pprint(mat_b)
46
47    with pf.set_parallel_backend_context("local_multiprocessing"):
48        result = add_matrices(mat_a, mat_b)
49
50    print("A + B =")
51    pprint.pprint(result)

Application Examples

Count Bigrams In A Text Parallelly

 1"""
 2Counts the most common two-letters sequences (bigrams) in the content of an URL.
 3
 4Usage:
 5
 6    $ git clone https://github.com/finos/opengris-parfun && cd parfun
 7    $ python -m examples.count_bigrams.main [--backend BACKEND] [--backend_args]
 8"""
 9
10import argparse
11import collections
12import json
13import ssl
14
15from typing import Counter, Iterable, List
16from urllib.request import urlopen
17
18import parfun as pf
19
20
21def sum_counters(counters: Iterable[Counter[str]]) -> Counter[str]:
22    return sum(counters, start=collections.Counter())
23
24
25@pf.parallel(
26    split=pf.per_argument(
27        lines=pf.py_list.by_chunk
28    ),
29    combine_with=sum_counters,
30)
31def count_bigrams(lines: List[str]) -> Counter:
32    counter: Counter[str] = collections.Counter()
33
34    for line in lines:
35        for word in line.split():
36            for first, second in zip(word, word[1:]):
37                bigram = f"{first}{second}"
38                counter[bigram] += 1
39
40    return counter
41
42
43if __name__ == "__main__":
44    TOP_K = 10
45
46    parser = argparse.ArgumentParser()
47    parser.add_argument("--text-url", default="https://www.gutenberg.org/ebooks/100.txt.utf-8")
48    parser.add_argument("--backend", default="local_multiprocessing")
49    parser.add_argument(
50        "--backend_args",
51        type=json.loads,
52        default={},
53        help="JSON backend kwargs, e.g. '{\"max_workers\": 4}'",
54    )
55    args = parser.parse_args()
56
57    with urlopen(args.text_url, context=ssl._create_unverified_context()) as response:
58        content = response.read().decode("utf-8").splitlines()
59
60    with pf.set_parallel_backend_context(args.backend, **args.backend_args):
61        counts = count_bigrams(content)
62
63    print(f"Top {TOP_K} words:")
64    for word, count in counts.most_common(TOP_K):
65        print(f"\t{word:<10}:\t{count}")

Parallel Training Random-Forest On Californian Housing Data

 1"""
 2Trains a random tree regressor on the California housing dataset from scikit-learn.
 3
 4Measures the training time when splitting the learning dataset process using Parfun.
 5
 6Usage:
 7
 8    $ git clone https://github.com/finos/opengris-parfun && cd parfun
 9    $ python -m examples.california_housing.main [--backend BACKEND] [--backend_args]
10"""
11
12import argparse
13import json
14import timeit
15
16from typing import List
17
18import numpy as np
19import pandas as pd
20
21from sklearn.datasets import fetch_california_housing
22from sklearn.base import RegressorMixin
23from sklearn.tree import DecisionTreeRegressor
24
25import parfun as pf
26
27
28class MeanRegressor(RegressorMixin):
29    def __init__(self, regressors: List[RegressorMixin]) -> None:
30        super().__init__()
31        self._regressors = regressors
32
33    def predict(self, X):
34        return np.mean([regressor.predict(X) for regressor in self._regressors])
35
36
37@pf.parallel(
38    split=pf.per_argument(dataframe=pf.dataframe.by_row),
39    combine_with=lambda regressors: MeanRegressor(list(regressors)),
40)
41def train_regressor(dataframe: pd.DataFrame, feature_names: List[str], target_name: str) -> RegressorMixin:
42
43    regressor = DecisionTreeRegressor()
44    regressor.fit(dataframe[feature_names], dataframe[[target_name]])
45
46    return regressor
47
48
49if __name__ == "__main__":
50    parser = argparse.ArgumentParser()
51    parser.add_argument("--backend", default="local_multiprocessing")
52    parser.add_argument(
53        "--backend_args",
54        type=json.loads,
55        default={},
56        help="JSON backend kwargs, e.g. '{\"max_workers\": 4}'",
57    )
58    args = parser.parse_args()
59
60    dataset = fetch_california_housing(download_if_missing=True)
61
62    feature_names = dataset["feature_names"]
63    target_name = dataset["target_names"][0]
64
65    dataframe = pd.DataFrame(dataset["data"], columns=feature_names)
66    dataframe[target_name] = dataset["target"]
67
68    N_MEASURES = 5
69
70    with pf.set_parallel_backend_context("local_single_process"):
71        regressor = train_regressor(dataframe, feature_names, target_name)
72
73        duration = (
74            timeit.timeit(lambda: train_regressor(dataframe, feature_names, target_name), number=N_MEASURES)
75            / N_MEASURES
76        )
77
78        print("Sequential training duration:", duration)
79
80    with pf.set_parallel_backend_context(args.backend, **args.backend_args):
81        regressor = train_regressor(dataframe, feature_names, target_name)
82
83        duration = (
84            timeit.timeit(lambda: train_regressor(dataframe, feature_names, target_name), number=N_MEASURES)
85            / N_MEASURES
86        )
87
88        print("Parallel training duration:", duration)

Compute Electricity Production Statistics Parallelly

  1"""
  2Based on the monthly electricity production data from ENTSO-E, plots the percentage of renewable energy production for
  3the European electricity grid.
  4
  5Usage:
  6
  7    $ git clone https://github.com/finos/opengris-parfun && cd parfun
  8    $ pip install -r examples/requirements.txt
  9    $ python -m examples.europe_electricity.main [--plot] [--backend BACKEND] [--backend_args]
 10
 11"""
 12
 13import argparse
 14import json
 15from typing import List
 16
 17import pandas as pd
 18
 19import parfun as pf
 20
 21
 22def fetch_production_data(year: int) -> pd.DataFrame:
 23    """
 24    Downloads the monthly production data for the given year.
 25
 26    Sourced from https://www.entsoe.eu/data/power-stats/.
 27    """
 28
 29    url = f"https://www.entsoe.eu/publications/data/power-stats/{year}/monthly_domestic_values_{year}.csv"
 30
 31    result = pd.read_csv(url, sep=r"\t|,|;", engine="python")
 32
 33    # Some newer datasets use "Area" instead of "Country"
 34    if "Area" in result.columns:
 35        result["Country"] = result["Area"]
 36
 37    return result[["Year", "Month", "Category", "Country", "ProvidedValue"]]
 38
 39
 40def make_consumption_negative(production_data: pd.DataFrame) -> pd.DataFrame:
 41    """
 42    Make consumption values negative production values.
 43
 44    Some production categories have positive consumption values (e.g. "Consumption of Hydro Water Reservoir"). This
 45    function transforms these values in their production counter parts, but with a negative value. This simplifies
 46    subsequent processing.
 47    """
 48
 49    PREFIX = "Consumption of "
 50
 51    result = production_data.copy()
 52
 53    is_consumption = result["Category"].str.startswith(PREFIX)
 54
 55    result.loc[is_consumption, "Category"] = result.loc[is_consumption, "Category"].str.replace(PREFIX, "", regex=False)
 56    result.loc[is_consumption, "ProvidedValue"] *= -1
 57
 58    return result
 59
 60
 61def group_production_by_type(production_data: pd.DataFrame) -> pd.DataFrame:
 62    """Groups and sums all production data by type ("Fossil", "Nuclear", "Renewable" and "Other")."""
 63
 64    fossil_sources = {
 65        "Fossil Gas", "Fossil Hard coal", "Fossil Oil",
 66        "Fossil Brown coal/Lignite", "Fossil Coal-derived gas",
 67        "Fossil Oil shale", "Fossil Peat",
 68    }
 69    nuclear_sources = {"Nuclear"}
 70    renewable_sources = {
 71        "Biomass", "Solar", "Wind Onshore", "Wind Offshore", "Geothermal",
 72        "Hydro Pumped Storage", "Hydro Run-of-river and poundage",
 73        "Hydro Water Reservoir", "Marine", "Other renewable",
 74    }
 75
 76    def map_category(category: str) -> str:
 77        if category in fossil_sources:
 78            return "Fossil"
 79        elif category in nuclear_sources:
 80            return "Nuclear"
 81        elif category in renewable_sources:
 82            return "Renewable"
 83        else:
 84            return "Other"
 85
 86    result = production_data.copy()
 87
 88    result["EnergyType"] = result["Category"].map(map_category)
 89    del result["Category"]
 90
 91    return result.groupby(["Year", "Month", "EnergyType"])["ProvidedValue"].sum().reset_index()
 92
 93
 94def monthly_percentage_production(production_data: pd.DataFrame) -> pd.DataFrame:
 95    """Returns the monthly production percentage for every month and every energy source type."""
 96
 97    result = production_data.pivot_table(index=["Year", "Month"], columns="EnergyType", values="ProvidedValue")
 98
 99    result = result.div(result.sum(axis=1), axis=0) * 100  # make it percentages
100
101    # Uses datetime for year-month
102    result.index = pd.to_datetime({
103        "year": result.index.get_level_values(0),
104        "month": result.index.get_level_values(1),
105        "day": 1,
106    })
107
108    result.sort_index(ascending=True)  # sort by date
109
110    return result
111
112
113@pf.parallel(
114    split=pf.all_arguments(pf.py_list.by_chunk),
115    combine_with=pf.dataframe.concat,
116    initial_partition_size=2,
117)
118def get_monthly_percentage_production(years: List[int]) -> pd.DataFrame:
119    processed_yearly_data = []
120    for year in years:
121        yearly_production_data = fetch_production_data(year)
122
123        yearly_production_data = make_consumption_negative(yearly_production_data)
124        yearly_production_data = group_production_by_type(yearly_production_data)
125        yearly_production_data = monthly_percentage_production(yearly_production_data)
126
127        processed_yearly_data.append(yearly_production_data)
128
129    return pd.concat(processed_yearly_data)
130
131
132def plot_electricity_production(production_percentages: pd.DataFrame) -> None:
133    import matplotlib.pyplot as plt
134
135    colors = {
136        "Fossil": "lightcoral",
137        "Nuclear": "violet",
138        "Renewable": "lightgreen",
139        "Other": "lightsteelblue",
140    }
141
142    production_percentages.index = production_percentages.index.strftime("%b %Y")
143    production_percentages.plot(kind="bar", stacked=True, figsize=(10, 6), width=1, color=colors)
144
145    plt.title("Europe's monthly electricity production by source")
146    plt.ylabel("Percentage (%)")
147    plt.xlabel('Month')
148    plt.legend(title="Energy source", loc='upper left')
149    plt.grid(axis="y", linestyle="--")
150    plt.ylim(0, 100)
151
152    plt.tight_layout()
153    plt.show()
154
155
156def main():
157    YEARS = list(range(2019, 2025))
158
159    parser = argparse.ArgumentParser()
160    parser.add_argument("--plot", action="store_true", help="Plot data instead of printing dataframe")
161    parser.add_argument(
162        "--backend",
163        default="local_multiprocessing",
164    )
165    parser.add_argument(
166        "--backend_args",
167        type=json.loads,
168        default={},
169        help="JSON backend kwargs, e.g. '{\"n_workers\": 4}'",
170    )
171    args = parser.parse_args()
172
173    with pf.set_parallel_backend_context(args.backend, **args.backend_args):
174        processed_data = get_monthly_percentage_production(YEARS)
175
176    if args.plot:
177        plot_electricity_production(processed_data)
178    else:
179        print(processed_data)
180
181
182if __name__ == "__main__":
183    main()

Compute Portfolio Metrics Parallelly

 1"""
 2Based on a portfolio of stocks, computes basic statistics.
 3
 4Usage:
 5
 6    $ git clone https://github.com/finos/opengris-parfun && cd parfun
 7    $ python -m examples.portfolio_metrics.main [--backend BACKEND] [--backend_args]
 8"""
 9
10import argparse
11import json
12from typing import List
13
14import pandas as pd
15
16import parfun as pf
17
18
19@pf.parallel(
20    split=pf.per_argument(portfolio=pf.dataframe.by_group(by="country")),
21    combine_with=pf.dataframe.concat,
22)
23def relative_metrics(portfolio: pd.DataFrame, columns: List[str]) -> pd.DataFrame:
24    """
25    Computes relative metrics (difference to mean, median ...) of a dataframe, for each of the requested dataframe's
26    values, grouped by country.
27    """
28
29    output = portfolio.copy()  # do not modify the input dataframe.
30
31    for country in output["country"].unique():
32        for column in columns:
33            values = output.loc[output["country"] == country, column]
34
35            mean = values.mean()
36            std = values.std()
37
38            output.loc[output["country"] == country, f"{column}_diff_to_mean"] = values - mean
39            output.loc[output["country"] == country, f"{column}_sq_diff_to_mean"] = (values - mean) ** 2
40            output.loc[output["country"] == country, f"{column}_relative_to_mean"] = (values - mean) / std
41
42    return output
43
44
45if __name__ == "__main__":
46    parser = argparse.ArgumentParser()
47    parser.add_argument("--backend", default="local_multiprocessing")
48    parser.add_argument(
49        "--backend_args",
50        type=json.loads,
51        default={},
52        help="JSON backend kwargs, e.g. '{\"n_workers\": 4}'",
53    )
54    args = parser.parse_args()
55
56    portfolio = pd.DataFrame({
57        "company": ["Apple", "Citigroup", "ASML", "Volkswagen", "Tencent"],
58        "industry": ["technology", "banking", "technology", "manufacturing", "manufacturing"],
59        "country": ["US", "US", "NL", "DE", "CN"],
60        "market_cap": [2828000000000, 80310000000, 236000000000, 55550000000, 345000000000],
61        "revenue": [397000000000, 79840000000, 27180000000, 312000000000, 79000000000],
62        "workforce": [161000, 240000, 39850, 650951, 104503]
63    })
64
65    with pf.set_parallel_backend_context(args.backend, **args.backend_args):
66        metrics = relative_metrics(portfolio, ["market_cap", "revenue"])
67
68    print(metrics)