Skip to content

dataset

Write and manage trends dataset.

defragment_dataset(data_dir='data')

Defragments the dataset partitions.

Parameters:

Name Type Description Default
data_dir str

directory of the Parquet dataset.

'data'
Source code in gtrends_collection/dataset.py
def defragment_dataset(data_dir: str = "data"):
    """
    Defragments the dataset partitions.

    Args:
      data_dir (str): directory of the Parquet dataset.
    """
    for part_name in listdir(data_dir):
        part_dir = f"{data_dir}/{part_name}/"
        part = pyarrow.dataset.dataset(
            part_dir, gtrends_schema, format="parquet", exclude_invalid_files=True
        ).to_table()
        pyarrow.parquet.write_table(part, f"{part_dir}part-0.parquet", compression="gzip")
        for fragment in glob(f"{part_dir}fragment*.parquet"):
            unlink(fragment)

update_status(data_dir, log_file=None)

Records state of data files.

Parameters:

Name Type Description Default
data_dir str

directory of a Parquet dataset.

required
log_file str

path to the log file.

None
Source code in gtrends_collection/dataset.py
def update_status(data_dir: str, log_file: Union[str, None] = None):
    """
    Records state of data files.

    Args:
      data_dir (str): directory of a Parquet dataset.
      log_file (str): path to the log file.
    """
    if log_file is None:
        log_file = dirname(abspath(data_dir)) + "/data_state.json"
    print(log_file)
    files = glob(f"{data_dir}/**/*.parquet")
    if isfile(log_file):
        with open(log_file, "r", encoding="utf-8") as file:
            state = json.load(file)
    else:
        state: Dict[str, str] = {}
    for file in files:
        with open(file, "rb") as file_buffer:
            content = file_buffer.read()
            state[file.replace("\\", "/")] = hashlib.md5(content).hexdigest()
    with open(log_file, "w", encoding="utf-8") as file:
        json.dump(state, file, indent=2)

write_to_dataset(data, data_dir='data', defragment=True)

Write term fragments to a Parquet dataset.

Parameters:

Name Type Description Default
data DataFrame

Collection results.

required
data_dir str

Directory of the Parquet dataset.

'data'
defragment bool

If True, defragments the dataset after writing new fragments.

True
Source code in gtrends_collection/dataset.py
def write_to_dataset(data: DataFrame, data_dir: str = "data", defragment: bool = True):
    """
    Write term fragments to a Parquet dataset.

    Args:
      data (DataFrame): Collection results.
      data_dir (str): Directory of the Parquet dataset.
      defragment (bool): If `True`, defragments the dataset after writing new fragments.
    """
    for term, group in data.groupby("term"):
        encoded_term = quote_plus(term)
        part_dir = f"{data_dir}/term={encoded_term}/"
        makedirs(part_dir, exist_ok=True)
        pyarrow.parquet.write_table(
            pyarrow.Table.from_pandas(group, schema=gtrends_schema),
            f"{part_dir}fragment-{ceil(time())!s}-0.parquet",
            compression="gzip",
        )
    if defragment:
        defragment_dataset(data_dir)
    update_status(data_dir)