Skip to content

dataset

Write and manage trends dataset.

defragment_dataset(data_dir='data')

Defragments the dataset partitions.

Parameters:

Name Type Description Default
data_dir str

directory of the Parquet dataset.

'data'
Source code in gtrends_collection/dataset.py
def defragment_dataset(data_dir: str = "data"):
    """
    Defragments the dataset partitions.

    Args:
      data_dir (str): directory of the Parquet dataset.
    """
    for part_name in listdir(data_dir):
        part_dir = f"{data_dir}/{part_name}/"
        part = pyarrow.dataset.dataset(
            part_dir, gtrends_schema, format="parquet", exclude_invalid_files=True
        ).to_table()
        pyarrow.parquet.write_table(part, f"{part_dir}part-0.parquet", compression="gzip")
        for fragment in glob(f"{part_dir}fragment*.parquet"):
            unlink(fragment)

write_to_dataset(data, data_dir='data', defragment=True)

Write term fragments to a Parquet dataset.

Parameters:

Name Type Description Default
data DataFrame

Collection results.

required
data_dir str

Directory of the Parquet dataset.

'data'
defragment bool

If True, defragments the dataset after writing new fragments.

True
Source code in gtrends_collection/dataset.py
def write_to_dataset(data: DataFrame, data_dir: str = "data", defragment: bool = True):
    """
    Write term fragments to a Parquet dataset.

    Args:
      data (DataFrame): Collection results.
      data_dir (str): Directory of the Parquet dataset.
      defragment (bool): If `True`, defragments the dataset after writing new fragments.
    """
    for term, group in data.groupby("term"):
        encoded_term = quote_plus(term)
        part_dir = f"{data_dir}/term={encoded_term}/"
        makedirs(part_dir, exist_ok=True)
        pyarrow.parquet.write_table(
            pyarrow.Table.from_pandas(group, schema=gtrends_schema),
            f"{part_dir}fragment-{ceil(time())!s}-0.parquet",
            compression="gzip",
        )
    if defragment:
        defragment_dataset(data_dir)