Coverage for src\gtrends_collection\dataset.py: 97%
25 statements
« prev ^ index » next coverage.py v7.9.0, created at 2025-06-12 05:25 -0400
« prev ^ index » next coverage.py v7.9.0, created at 2025-06-12 05:25 -0400
1"""Write and manage trends dataset."""
3from glob import glob
4from math import ceil
5from os import listdir, makedirs, unlink
6from time import time
7from urllib.parse import quote_plus
9import pyarrow
10import pyarrow.dataset
11import pyarrow.parquet
12from pandas import DataFrame
14gtrends_schema = pyarrow.schema(
15 [
16 pyarrow.field("value", pyarrow.float64()),
17 pyarrow.field("date", pyarrow.string()),
18 pyarrow.field("location", pyarrow.string()),
19 pyarrow.field("term", pyarrow.string()),
20 pyarrow.field("retrieved", pyarrow.string()),
21 ]
22)
25def write_to_dataset(data: DataFrame, data_dir: str = "data", defragment: bool = True):
26 """
27 Write term fragments to a Parquet dataset.
29 Args:
30 data (DataFrame): Collection results.
31 data_dir (str): Directory of the Parquet dataset.
32 defragment (bool): If `True`, defragments the dataset after writing new fragments.
33 """
34 for term, group in data.groupby("term"):
35 encoded_term = quote_plus(term)
36 part_dir = f"{data_dir}/term={encoded_term}/"
37 makedirs(part_dir, exist_ok=True)
38 pyarrow.parquet.write_table(
39 pyarrow.Table.from_pandas(group, schema=gtrends_schema),
40 f"{part_dir}fragment-{ceil(time())!s}-0.parquet",
41 compression="gzip",
42 )
43 if defragment: 43 ↛ exitline 43 didn't return from function 'write_to_dataset' because the condition on line 43 was always true
44 defragment_dataset(data_dir)
47def defragment_dataset(data_dir: str = "data"):
48 """
49 Defragments the dataset partitions.
51 Args:
52 data_dir (str): directory of the Parquet dataset.
53 """
54 for part_name in listdir(data_dir):
55 part_dir = f"{data_dir}/{part_name}/"
56 part = pyarrow.dataset.dataset(
57 part_dir, gtrends_schema, format="parquet", exclude_invalid_files=True
58 ).to_table()
59 pyarrow.parquet.write_table(part, f"{part_dir}part-0.parquet", compression="gzip")
60 for fragment in glob(f"{part_dir}fragment*.parquet"):
61 unlink(fragment)