Examples
API Usage Examples
Backend Setup
1"""
2Shows the two ways of initializing a Parfun backend.
3
4Usage:
5
6 $ git clone https://github.com/Citi/parfun && cd parfun
7 $ python -m examples.api_usage.backend_setup
8"""
9
10import parfun as pf
11
12
13if __name__ == "__main__":
14 # Set the parallel backend process-wise.
15 pf.set_parallel_backend("local_multiprocessing")
16
17 # Set the parallel backend with a Python context.
18 with pf.set_parallel_backend_context("scaler_remote", scheduler_address="tcp://scaler.cluster:1243"):
19 ... # Will run the parallel tasks over a remotely setup Scaler cluster.
Partitioning API
all_arguments
1"""
2Uses `all_arguments` to partition all the input data of a parallel function.
3
4Usage:
5
6 $ git clone https://github.com/Citi/parfun && cd parfun
7 $ python -m examples.api_usage.all_arguments
8"""
9
10import pandas as pd
11
12import parfun as pf
13
14
15@pf.parallel(
16 split=pf.all_arguments(pf.dataframe.by_group(by=["year", "month"])),
17 combine_with=pf.dataframe.concat,
18)
19def monthly_sum(sales: pd.DataFrame, costs: pd.DataFrame) -> pd.DataFrame:
20 merged = pd.merge(sales, costs, on=["year", "month", "day"], how="outer")
21 # Group and sum by day
22 grouped = merged.groupby(["year", "month", "day"], as_index=False).sum(numeric_only=True)
23
24 return grouped
25
26
27if __name__ == "__main__":
28 sales = pd.DataFrame({
29 "year": [2024, 2024, 2024],
30 "month": [1, 1, 2],
31 "day": [1, 2, 1],
32 "sales": [100, 200, 150]
33 })
34
35 costs = pd.DataFrame({
36 "year": [2024, 2024, 2024],
37 "month": [1, 1, 2],
38 "day": [1, 2, 1],
39 "costs": [50, 70, 80]
40 })
41
42 with pf.set_parallel_backend_context("local_multiprocessing"):
43 result = monthly_sum(sales, costs)
44
45 print(result)
46 # year month day sales costs
47 # 0 2024 1 1 100 50
48 # 1 2024 1 2 200 70
49 # 2 2024 2 1 150 80
per_argument
1"""
2Uses `per_argument` to partition the input data from multiple arguments.
3
4Usage:
5
6 $ git clone https://github.com/Citi/parfun && cd parfun
7 $ python -m examples.api_usage.per_argument
8"""
9
10from typing import List
11
12import pandas as pd
13
14import parfun as pf
15
16
17@pf.parallel(
18 split=pf.per_argument(
19 factors=pf.py_list.by_chunk,
20 dataframe=pf.dataframe.by_row,
21 ),
22 combine_with=pf.dataframe.concat,
23)
24def multiply_by_row(factors: List[int], dataframe: pd.DataFrame) -> pd.DataFrame:
25 assert len(factors) == len(dataframe)
26 return dataframe.multiply(factors, axis=0)
27
28
29if __name__ == "__main__":
30 dataframe = pd.DataFrame({
31 "A": [1, 2, 3],
32 "B": [4, 5, 6]
33 })
34
35 factors = [10, 20, 30]
36
37 with pf.set_parallel_backend_context("local_multiprocessing"):
38 result = multiply_by_row(factors, dataframe)
39
40 print(result)
41 # A B
42 # 0 10 40
43 # 1 40 100
44 # 2 90 180
Custom Partition Function
1"""
2Shows how to use custom Python generators and functions as partitioning and combining functions.
3
4Usage:
5
6 $ git clone https://github.com/Citi/parfun && cd parfun
7 $ python -m examples.api_usage.custom_generators
8"""
9
10from typing import Generator, Iterable, Tuple
11
12import pandas as pd
13
14import parfun as pf
15
16
17def partition_by_day_of_week(dataframe: pd.DataFrame) -> Generator[Tuple[pd.DataFrame], None, None]:
18 """Divides the computation on the "datetime" value, by day of the week (Monday, Tuesday ...)."""
19
20 for _, partition in dataframe.groupby(dataframe["datetime"].dt.day_of_week):
21 yield partition, # Should always yield a tuple that matches the input parameters.
22
23
24def combine_results(dataframes: Iterable[pd.DataFrame]) -> pd.DataFrame:
25 """Collects the results by concatenating them, and make sure the values are kept sorted by date."""
26 return pd.concat(dataframes).sort_values(by="datetime")
27
28
29@pf.parallel(
30 split=pf.all_arguments(partition_by_day_of_week),
31 combine_with=combine_results,
32)
33def daily_mean(dataframe: pd.DataFrame) -> pd.DataFrame:
34 return dataframe.groupby(dataframe["datetime"].dt.date).mean(numeric_only=True)
35
36
37if __name__ == "__main__":
38 dataframe = pd.DataFrame({
39 # Probing times
40 "datetime": pd.to_datetime([
41 "2025-04-01 06:00", "2025-04-01 18:00", "2025-04-02 10:00", "2025-04-03 14:00", "2025-04-03 23:00",
42 "2025-04-04 08:00", "2025-04-05 12:00", "2025-04-06 07:00", "2025-04-06 20:00", "2025-04-07 09:00",
43 "2025-04-08 15:00", "2025-04-09 11:00", "2025-04-10 13:00", "2025-04-11 06:00", "2025-04-12 16:00",
44 "2025-04-13 17:00", "2025-04-14 22:00", "2025-04-15 10:00", "2025-04-16 09:00", "2025-04-17 13:00",
45 "2025-04-18 14:00", "2025-04-19 18:00", "2025-04-20 07:00", "2025-04-21 20:00", "2025-04-22 15:00",
46 ]),
47 # Temperature values (°C)
48 "temperature": [
49 7.2, 10.1, 9.8, 12.5, 11.7,
50 8.9, 13.0, 7.5, 10.8, 9.3,
51 12.1, 11.5, 13.3, 6.8, 12.7,
52 13.5, 9.2, 10.0, 9.9, 11.8,
53 12.4, 10.6, 7.9, 9.5, 11.6,
54 ],
55 # Humidity values (%)
56 "humidity": [
57 85, 78, 80, 75, 76,
58 88, 73, 89, 77, 84,
59 72, 74, 70, 90, 71,
60 69, 86, 81, 83, 76,
61 74, 79, 87, 82, 73,
62 ]
63 })
64
65 with pf.set_parallel_backend_context("local_multiprocessing"):
66 result = daily_mean(dataframe)
67
68 print(result)
Enforced Partition Size
1"""
2Uses `all_arguments` to partition all the input data of a parallel function.
3
4Usage:
5
6 $ git clone https://github.com/Citi/parfun && cd parfun
7 $ python -m examples.api_usage.partition_size
8"""
9
10import numpy as np
11import pandas as pd
12
13import parfun as pf
14
15
16# With `fixed_partition_size`, the input dataframe will always be split in chunks of 1000 rows.
17@pf.parallel(
18 split=pf.all_arguments(pf.dataframe.by_row),
19 combine_with=sum,
20 fixed_partition_size=1000,
21)
22def fixed_partition_size_sum(dataframe: pd.DataFrame) -> float:
23 return dataframe.values.sum()
24
25
26# With `initial_partition_size`, the input dataframe will be split in chunks of 1000 rows until Parfun's
27# machine-learning algorithm find a better estimate.
28@pf.parallel(
29 split=pf.all_arguments(pf.dataframe.by_row),
30 combine_with=sum,
31 initial_partition_size=1000,
32)
33def initial_partition_size_sum(dataframe: pd.DataFrame) -> float:
34 return dataframe.values.sum()
35
36
37# Both `fixed_partition_size` and `initial_partition_size` can accept a callable instead of an integer value. This
38# allows for partition sizes to be computed based on the input parameters.
39@pf.parallel(
40 split=pf.all_arguments(pf.dataframe.by_row),
41 combine_with=sum,
42 initial_partition_size=lambda dataframe: max(10, len(dataframe) // 4),
43)
44def computed_partition_size_sum(dataframe: pd.DataFrame) -> float:
45 return dataframe.values.sum()
46
47
48if __name__ == "__main__":
49 dataframe = pd.DataFrame(
50 np.random.randint(0, 100, size=(100, 3)),
51 columns=["alpha", "beta", "gamma"],
52 )
53
54 with pf.set_parallel_backend_context("local_multiprocessing"):
55 print(fixed_partition_size_sum(dataframe))
56 print(initial_partition_size_sum(dataframe))
57 print(computed_partition_size_sum(dataframe))
Profiling
1"""
2Demonstrates the use of the `profile` and `trace_export` parameters for profiling Parfun's performances.
3
4Usage:
5
6 $ git clone https://github.com/Citi/parfun && cd parfun
7 $ python -m examples.api_usage.profiling
8"""
9
10from typing import List
11import random
12
13import parfun as pf
14
15
16@pf.parallel(
17 split=pf.all_arguments(pf.py_list.by_chunk),
18 combine_with=sum,
19 profile=True,
20 trace_export="parallel_sum_trace.csv",
21)
22def parallel_sum(values: List) -> List:
23 return sum(values)
24
25
26if __name__ == "__main__":
27 N_VALUES = 100_000
28 values = [random.randint(0, 99) for _ in range(0, N_VALUES)]
29
30 with pf.set_parallel_backend_context("local_multiprocessing"):
31 print("Sum =", parallel_sum(values))
Nested Parfun Calls
1"""
2Shows how a Parfun function can be called from within another Parfun function.
3
4Usage:
5
6 $ git clone https://github.com/Citi/parfun && cd parfun
7 $ python -m examples.api_usage.nested_functions
8"""
9
10import pprint
11import random
12from typing import List
13
14import parfun as pf
15
16
17@pf.parallel(
18 split=pf.all_arguments(pf.py_list.by_chunk),
19 combine_with=pf.py_list.concat,
20)
21def add_vectors(vec_a: List, vec_b: List) -> List:
22 """Add two vectors, element-wise."""
23 return [a + b for a, b in zip(vec_a, vec_b)]
24
25
26@pf.parallel(
27 split=pf.all_arguments(pf.py_list.by_chunk),
28 combine_with=pf.py_list.concat,
29)
30def add_matrices(mat_a: List[List], mat_b: List[List]) -> List[List]:
31 """Add two matrices, row by row."""
32 return [add_vectors(vec_a, vec_b) for vec_a, vec_b in zip(mat_a, mat_b)]
33
34
35if __name__ == "__main__":
36 N_ROWS, N_COLS = 10, 10
37
38 mat_a = [[random.randint(0, 99) for _ in range(0, N_COLS)] for _ in range(0, N_ROWS)]
39 mat_b = [[random.randint(0, 99) for _ in range(0, N_COLS)] for _ in range(0, N_ROWS)]
40
41 print("A =")
42 pprint.pprint(mat_a)
43
44 print("B =")
45 pprint.pprint(mat_b)
46
47 with pf.set_parallel_backend_context("local_multiprocessing"):
48 result = add_matrices(mat_a, mat_b)
49
50 print("A + B =")
51 pprint.pprint(result)
Application Examples
Count Bigrams In A Text Parallelly
1"""
2Counts the most common two-letters sequences (bigrams) in the content of an URL.
3
4Usage:
5
6 $ git clone https://github.com/finos/opengris-parfun && cd parfun
7 $ python -m examples.count_bigrams.main [--backend BACKEND] [--backend_args]
8"""
9
10import argparse
11import collections
12import json
13import ssl
14
15from typing import Counter, Iterable, List
16from urllib.request import urlopen
17
18import parfun as pf
19
20
21def sum_counters(counters: Iterable[Counter[str]]) -> Counter[str]:
22 return sum(counters, start=collections.Counter())
23
24
25@pf.parallel(
26 split=pf.per_argument(
27 lines=pf.py_list.by_chunk
28 ),
29 combine_with=sum_counters,
30)
31def count_bigrams(lines: List[str]) -> Counter:
32 counter: Counter[str] = collections.Counter()
33
34 for line in lines:
35 for word in line.split():
36 for first, second in zip(word, word[1:]):
37 bigram = f"{first}{second}"
38 counter[bigram] += 1
39
40 return counter
41
42
43if __name__ == "__main__":
44 TOP_K = 10
45
46 parser = argparse.ArgumentParser()
47 parser.add_argument("--text-url", default="https://www.gutenberg.org/ebooks/100.txt.utf-8")
48 parser.add_argument("--backend", default="local_multiprocessing")
49 parser.add_argument(
50 "--backend_args",
51 type=json.loads,
52 default={},
53 help="JSON backend kwargs, e.g. '{\"max_workers\": 4}'",
54 )
55 args = parser.parse_args()
56
57 with urlopen(args.text_url, context=ssl._create_unverified_context()) as response:
58 content = response.read().decode("utf-8").splitlines()
59
60 with pf.set_parallel_backend_context(args.backend, **args.backend_args):
61 counts = count_bigrams(content)
62
63 print(f"Top {TOP_K} words:")
64 for word, count in counts.most_common(TOP_K):
65 print(f"\t{word:<10}:\t{count}")
Parallel Training Random-Forest On Californian Housing Data
1"""
2Trains a random tree regressor on the California housing dataset from scikit-learn.
3
4Measures the training time when splitting the learning dataset process using Parfun.
5
6Usage:
7
8 $ git clone https://github.com/finos/opengris-parfun && cd parfun
9 $ python -m examples.california_housing.main [--backend BACKEND] [--backend_args]
10"""
11
12import argparse
13import json
14import timeit
15
16from typing import List
17
18import numpy as np
19import pandas as pd
20
21from sklearn.datasets import fetch_california_housing
22from sklearn.base import RegressorMixin
23from sklearn.tree import DecisionTreeRegressor
24
25import parfun as pf
26
27
28class MeanRegressor(RegressorMixin):
29 def __init__(self, regressors: List[RegressorMixin]) -> None:
30 super().__init__()
31 self._regressors = regressors
32
33 def predict(self, X):
34 return np.mean([regressor.predict(X) for regressor in self._regressors])
35
36
37@pf.parallel(
38 split=pf.per_argument(dataframe=pf.dataframe.by_row),
39 combine_with=lambda regressors: MeanRegressor(list(regressors)),
40)
41def train_regressor(dataframe: pd.DataFrame, feature_names: List[str], target_name: str) -> RegressorMixin:
42
43 regressor = DecisionTreeRegressor()
44 regressor.fit(dataframe[feature_names], dataframe[[target_name]])
45
46 return regressor
47
48
49if __name__ == "__main__":
50 parser = argparse.ArgumentParser()
51 parser.add_argument("--backend", default="local_multiprocessing")
52 parser.add_argument(
53 "--backend_args",
54 type=json.loads,
55 default={},
56 help="JSON backend kwargs, e.g. '{\"max_workers\": 4}'",
57 )
58 args = parser.parse_args()
59
60 dataset = fetch_california_housing(download_if_missing=True)
61
62 feature_names = dataset["feature_names"]
63 target_name = dataset["target_names"][0]
64
65 dataframe = pd.DataFrame(dataset["data"], columns=feature_names)
66 dataframe[target_name] = dataset["target"]
67
68 N_MEASURES = 5
69
70 with pf.set_parallel_backend_context("local_single_process"):
71 regressor = train_regressor(dataframe, feature_names, target_name)
72
73 duration = (
74 timeit.timeit(lambda: train_regressor(dataframe, feature_names, target_name), number=N_MEASURES)
75 / N_MEASURES
76 )
77
78 print("Sequential training duration:", duration)
79
80 with pf.set_parallel_backend_context(args.backend, **args.backend_args):
81 regressor = train_regressor(dataframe, feature_names, target_name)
82
83 duration = (
84 timeit.timeit(lambda: train_regressor(dataframe, feature_names, target_name), number=N_MEASURES)
85 / N_MEASURES
86 )
87
88 print("Parallel training duration:", duration)
Compute Electricity Production Statistics Parallelly
1"""
2Based on the monthly electricity production data from ENTSO-E, plots the percentage of renewable energy production for
3the European electricity grid.
4
5Usage:
6
7 $ git clone https://github.com/finos/opengris-parfun && cd parfun
8 $ pip install -r examples/requirements.txt
9 $ python -m examples.europe_electricity.main [--plot] [--backend BACKEND] [--backend_args]
10
11"""
12
13import argparse
14import json
15from typing import List
16
17import pandas as pd
18
19import parfun as pf
20
21
22def fetch_production_data(year: int) -> pd.DataFrame:
23 """
24 Downloads the monthly production data for the given year.
25
26 Sourced from https://www.entsoe.eu/data/power-stats/.
27 """
28
29 url = f"https://www.entsoe.eu/publications/data/power-stats/{year}/monthly_domestic_values_{year}.csv"
30
31 result = pd.read_csv(url, sep=r"\t|,|;", engine="python")
32
33 # Some newer datasets use "Area" instead of "Country"
34 if "Area" in result.columns:
35 result["Country"] = result["Area"]
36
37 return result[["Year", "Month", "Category", "Country", "ProvidedValue"]]
38
39
40def make_consumption_negative(production_data: pd.DataFrame) -> pd.DataFrame:
41 """
42 Make consumption values negative production values.
43
44 Some production categories have positive consumption values (e.g. "Consumption of Hydro Water Reservoir"). This
45 function transforms these values in their production counter parts, but with a negative value. This simplifies
46 subsequent processing.
47 """
48
49 PREFIX = "Consumption of "
50
51 result = production_data.copy()
52
53 is_consumption = result["Category"].str.startswith(PREFIX)
54
55 result.loc[is_consumption, "Category"] = result.loc[is_consumption, "Category"].str.replace(PREFIX, "", regex=False)
56 result.loc[is_consumption, "ProvidedValue"] *= -1
57
58 return result
59
60
61def group_production_by_type(production_data: pd.DataFrame) -> pd.DataFrame:
62 """Groups and sums all production data by type ("Fossil", "Nuclear", "Renewable" and "Other")."""
63
64 fossil_sources = {
65 "Fossil Gas", "Fossil Hard coal", "Fossil Oil",
66 "Fossil Brown coal/Lignite", "Fossil Coal-derived gas",
67 "Fossil Oil shale", "Fossil Peat",
68 }
69 nuclear_sources = {"Nuclear"}
70 renewable_sources = {
71 "Biomass", "Solar", "Wind Onshore", "Wind Offshore", "Geothermal",
72 "Hydro Pumped Storage", "Hydro Run-of-river and poundage",
73 "Hydro Water Reservoir", "Marine", "Other renewable",
74 }
75
76 def map_category(category: str) -> str:
77 if category in fossil_sources:
78 return "Fossil"
79 elif category in nuclear_sources:
80 return "Nuclear"
81 elif category in renewable_sources:
82 return "Renewable"
83 else:
84 return "Other"
85
86 result = production_data.copy()
87
88 result["EnergyType"] = result["Category"].map(map_category)
89 del result["Category"]
90
91 return result.groupby(["Year", "Month", "EnergyType"])["ProvidedValue"].sum().reset_index()
92
93
94def monthly_percentage_production(production_data: pd.DataFrame) -> pd.DataFrame:
95 """Returns the monthly production percentage for every month and every energy source type."""
96
97 result = production_data.pivot_table(index=["Year", "Month"], columns="EnergyType", values="ProvidedValue")
98
99 result = result.div(result.sum(axis=1), axis=0) * 100 # make it percentages
100
101 # Uses datetime for year-month
102 result.index = pd.to_datetime({
103 "year": result.index.get_level_values(0),
104 "month": result.index.get_level_values(1),
105 "day": 1,
106 })
107
108 result.sort_index(ascending=True) # sort by date
109
110 return result
111
112
113@pf.parallel(
114 split=pf.all_arguments(pf.py_list.by_chunk),
115 combine_with=pf.dataframe.concat,
116 initial_partition_size=2,
117)
118def get_monthly_percentage_production(years: List[int]) -> pd.DataFrame:
119 processed_yearly_data = []
120 for year in years:
121 yearly_production_data = fetch_production_data(year)
122
123 yearly_production_data = make_consumption_negative(yearly_production_data)
124 yearly_production_data = group_production_by_type(yearly_production_data)
125 yearly_production_data = monthly_percentage_production(yearly_production_data)
126
127 processed_yearly_data.append(yearly_production_data)
128
129 return pd.concat(processed_yearly_data)
130
131
132def plot_electricity_production(production_percentages: pd.DataFrame) -> None:
133 import matplotlib.pyplot as plt
134
135 colors = {
136 "Fossil": "lightcoral",
137 "Nuclear": "violet",
138 "Renewable": "lightgreen",
139 "Other": "lightsteelblue",
140 }
141
142 production_percentages.index = production_percentages.index.strftime("%b %Y")
143 production_percentages.plot(kind="bar", stacked=True, figsize=(10, 6), width=1, color=colors)
144
145 plt.title("Europe's monthly electricity production by source")
146 plt.ylabel("Percentage (%)")
147 plt.xlabel('Month')
148 plt.legend(title="Energy source", loc='upper left')
149 plt.grid(axis="y", linestyle="--")
150 plt.ylim(0, 100)
151
152 plt.tight_layout()
153 plt.show()
154
155
156def main():
157 YEARS = list(range(2019, 2025))
158
159 parser = argparse.ArgumentParser()
160 parser.add_argument("--plot", action="store_true", help="Plot data instead of printing dataframe")
161 parser.add_argument(
162 "--backend",
163 default="local_multiprocessing",
164 )
165 parser.add_argument(
166 "--backend_args",
167 type=json.loads,
168 default={},
169 help="JSON backend kwargs, e.g. '{\"n_workers\": 4}'",
170 )
171 args = parser.parse_args()
172
173 with pf.set_parallel_backend_context(args.backend, **args.backend_args):
174 processed_data = get_monthly_percentage_production(YEARS)
175
176 if args.plot:
177 plot_electricity_production(processed_data)
178 else:
179 print(processed_data)
180
181
182if __name__ == "__main__":
183 main()
Compute Portfolio Metrics Parallelly
1"""
2Based on a portfolio of stocks, computes basic statistics.
3
4Usage:
5
6 $ git clone https://github.com/finos/opengris-parfun && cd parfun
7 $ python -m examples.portfolio_metrics.main [--backend BACKEND] [--backend_args]
8"""
9
10import argparse
11import json
12from typing import List
13
14import pandas as pd
15
16import parfun as pf
17
18
19@pf.parallel(
20 split=pf.per_argument(portfolio=pf.dataframe.by_group(by="country")),
21 combine_with=pf.dataframe.concat,
22)
23def relative_metrics(portfolio: pd.DataFrame, columns: List[str]) -> pd.DataFrame:
24 """
25 Computes relative metrics (difference to mean, median ...) of a dataframe, for each of the requested dataframe's
26 values, grouped by country.
27 """
28
29 output = portfolio.copy() # do not modify the input dataframe.
30
31 for country in output["country"].unique():
32 for column in columns:
33 values = output.loc[output["country"] == country, column]
34
35 mean = values.mean()
36 std = values.std()
37
38 output.loc[output["country"] == country, f"{column}_diff_to_mean"] = values - mean
39 output.loc[output["country"] == country, f"{column}_sq_diff_to_mean"] = (values - mean) ** 2
40 output.loc[output["country"] == country, f"{column}_relative_to_mean"] = (values - mean) / std
41
42 return output
43
44
45if __name__ == "__main__":
46 parser = argparse.ArgumentParser()
47 parser.add_argument("--backend", default="local_multiprocessing")
48 parser.add_argument(
49 "--backend_args",
50 type=json.loads,
51 default={},
52 help="JSON backend kwargs, e.g. '{\"n_workers\": 4}'",
53 )
54 args = parser.parse_args()
55
56 portfolio = pd.DataFrame({
57 "company": ["Apple", "Citigroup", "ASML", "Volkswagen", "Tencent"],
58 "industry": ["technology", "banking", "technology", "manufacturing", "manufacturing"],
59 "country": ["US", "US", "NL", "DE", "CN"],
60 "market_cap": [2828000000000, 80310000000, 236000000000, 55550000000, 345000000000],
61 "revenue": [397000000000, 79840000000, 27180000000, 312000000000, 79000000000],
62 "workforce": [161000, 240000, 39850, 650951, 104503]
63 })
64
65 with pf.set_parallel_backend_context(args.backend, **args.backend_args):
66 metrics = relative_metrics(portfolio, ["market_cap", "revenue"])
67
68 print(metrics)