Timing parallel processing¶
This tutorial is meant to help you benchmark different parallel processing methods for the processing of molecules into graphs. This will allow you to chose the one most suitable for your machine, since the benchmarks vary per machine.
In general, we find that using joblib
with the loky
parallel processing and a batch size of 1000
is most beneficial. The logic is abstracted into datamol.parallelized_with_batches
In [1]:
Copied!
%load_ext autoreload
%autoreload 2
import joblib
import numpy as np
import datamol as dm
import pandas as pd
from pandarallel import pandarallel
pandarallel.initialize(progress_bar=True, nb_workers=joblib.cpu_count())
%load_ext autoreload
%autoreload 2
import joblib
import numpy as np
import datamol as dm
import pandas as pd
from pandarallel import pandarallel
pandarallel.initialize(progress_bar=True, nb_workers=joblib.cpu_count())
INFO: Pandarallel will run on 240 workers. INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.
Setup¶
In [2]:
Copied!
# download from https://raw.githubusercontent.com/aspuru-guzik-group/chemical_vae/master/models/zinc_properties/250k_rndm_zinc_drugs_clean_3.csv
# data = pd.read_csv("/home/hadim/250k_rndm_zinc_drugs_clean_3.csv", usecols=["smiles"])
# download from https://storage.googleapis.com/graphium-public/datasets/QM9/norm_qm9.csv
data = pd.read_csv("https://storage.googleapis.com/graphium-public/datasets/QM9/norm_qm9.csv", usecols=["smiles"])
# download from https://raw.githubusercontent.com/aspuru-guzik-group/chemical_vae/master/models/zinc_properties/250k_rndm_zinc_drugs_clean_3.csv
# data = pd.read_csv("/home/hadim/250k_rndm_zinc_drugs_clean_3.csv", usecols=["smiles"])
# download from https://storage.googleapis.com/graphium-public/datasets/QM9/norm_qm9.csv
data = pd.read_csv("https://storage.googleapis.com/graphium-public/datasets/QM9/norm_qm9.csv", usecols=["smiles"])
In [3]:
Copied!
rows_number_list = [250_000]
batch_size_list = [10, 100, 1_000, 10_000]
def smiles_to_unique_mol_id(smiles):
try:
mol = dm.to_mol(mol=smiles)
mol_id = dm.unique_id(mol)
except:
mol_id = ""
if mol_id is None:
mol_id = ""
return mol_id
def smiles_to_unique_mol_id_batch(smiles_list):
mol_id_list = []
for smiles in smiles_list:
mol_id_list.append(smiles_to_unique_mol_id(smiles))
return mol_id_list
rows_number_list = [250_000]
batch_size_list = [10, 100, 1_000, 10_000]
def smiles_to_unique_mol_id(smiles):
try:
mol = dm.to_mol(mol=smiles)
mol_id = dm.unique_id(mol)
except:
mol_id = ""
if mol_id is None:
mol_id = ""
return mol_id
def smiles_to_unique_mol_id_batch(smiles_list):
mol_id_list = []
for smiles in smiles_list:
mol_id_list.append(smiles_to_unique_mol_id(smiles))
return mol_id_list
Benchmarks¶
In [4]:
Copied!
benchmark = []
benchmark = []
No batch¶
In [5]:
Copied!
for n in rows_number_list:
df = data.iloc[:n]
with dm.utils.perf.watch_duration(log=False) as d:
out = dm.parallelized(
smiles_to_unique_mol_id,
df["smiles"].values,
progress=True,
n_jobs=-1,
scheduler="processes",
)
datum = {
"batch": False,
"batch_size": None,
"scheduler": "loky_processes",
"duration_minutes": d.duration_minutes,
"duration_seconds": d.duration,
"n_rows": len(df),
}
benchmark.append(datum)
for n in rows_number_list:
df = data.iloc[:n]
with dm.utils.perf.watch_duration(log=False) as d:
out = dm.parallelized(
smiles_to_unique_mol_id,
df["smiles"].values,
progress=True,
n_jobs=-1,
scheduler="processes",
)
datum = {
"batch": False,
"batch_size": None,
"scheduler": "loky_processes",
"duration_minutes": d.duration_minutes,
"duration_seconds": d.duration,
"n_rows": len(df),
}
benchmark.append(datum)
0%| | 0/133885 [00:00<?, ?it/s]
Batch¶
In [5]:
Copied!
for batch_size in batch_size_list:
for n in rows_number_list:
df = data.iloc[:n]
with dm.utils.perf.watch_duration(log=False) as d:
out = dm.parallelized_with_batches(
smiles_to_unique_mol_id_batch,
df["smiles"].values,
batch_size=batch_size,
progress=True,
n_jobs=-1,
scheduler="processes",
)
assert len(out) == len(df), f"{len(out)} != {len(df)}"
datum = {
"batch": True,
"batch_size": batch_size,
"scheduler": "loky_processes",
"duration_minutes": d.duration_minutes,
"duration_seconds": d.duration,
"n_rows": len(df),
}
benchmark.append(datum)
for batch_size in batch_size_list:
for n in rows_number_list:
df = data.iloc[:n]
with dm.utils.perf.watch_duration(log=False) as d:
out = dm.parallelized_with_batches(
smiles_to_unique_mol_id_batch,
df["smiles"].values,
batch_size=batch_size,
progress=True,
n_jobs=-1,
scheduler="processes",
)
assert len(out) == len(df), f"{len(out)} != {len(df)}"
datum = {
"batch": True,
"batch_size": batch_size,
"scheduler": "loky_processes",
"duration_minutes": d.duration_minutes,
"duration_seconds": d.duration,
"n_rows": len(df),
}
benchmark.append(datum)
0%| | 0/13388 [00:00<?, ?it/s]
0%| | 0/1338 [00:00<?, ?it/s]
0%| | 0/133 [00:00<?, ?it/s]
0%| | 0/13 [00:00<?, ?it/s]
In [7]:
Copied!
for n in rows_number_list:
df = data.iloc[:n]
with dm.utils.perf.watch_duration(log=False) as d:
_ = df["smiles"].parallel_apply(smiles_to_unique_mol_id)
datum = {
"batch": False,
"batch_size": None,
"scheduler": "pandarallel",
"duration_minutes": d.duration_minutes,
"duration_seconds": d.duration,
"n_rows": len(df),
}
benchmark.append(datum)
for n in rows_number_list:
df = data.iloc[:n]
with dm.utils.perf.watch_duration(log=False) as d:
_ = df["smiles"].parallel_apply(smiles_to_unique_mol_id)
datum = {
"batch": False,
"batch_size": None,
"scheduler": "pandarallel",
"duration_minutes": d.duration_minutes,
"duration_seconds": d.duration,
"n_rows": len(df),
}
benchmark.append(datum)
VBox(children=(HBox(children=(IntProgress(value=0, description='0.00%', max=558), Label(value='0 / 558'))), HB…
Results¶
In [8]:
Copied!
b = pd.DataFrame(benchmark)
b["duration_seconds_per_mol"] = b["duration_seconds"] / b["n_rows"]
b.sort_values("duration_seconds_per_mol")
b = pd.DataFrame(benchmark)
b["duration_seconds_per_mol"] = b["duration_seconds"] / b["n_rows"]
b.sort_values("duration_seconds_per_mol")
Out[8]:
batch | batch_size | scheduler | duration_minutes | duration_seconds | n_rows | duration_seconds_per_mol | |
---|---|---|---|---|---|---|---|
3 | True | 1000.0 | loky_processes | 0.014199 | 0.851930 | 133885 | 0.000006 |
2 | True | 100.0 | loky_processes | 0.037132 | 2.227947 | 133885 | 0.000017 |
4 | True | 10000.0 | loky_processes | 0.047438 | 2.846266 | 133885 | 0.000021 |
5 | False | NaN | pandarallel | 0.118230 | 7.093791 | 133885 | 0.000053 |
1 | True | 10.0 | loky_processes | 0.222177 | 13.330603 | 133885 | 0.000100 |
0 | False | NaN | loky_processes | 4.002346 | 240.140754 | 133885 | 0.001794 |
In [ ]:
Copied!
In [ ]:
Copied!