1"""Write and manage trends dataset."""
2
3import hashlib
4import json
5from glob import glob
6from math import ceil
7from os import listdir, makedirs, unlink
8from os.path import abspath, dirname, isfile
9from time import time
10from urllib.parse import quote_plus
11from typing import Dict, Union
12
13import pyarrow
14import pyarrow.dataset
15import pyarrow.parquet
16from pandas import DataFrame
17
18gtrends_schema = pyarrow.schema(
19 [
20 pyarrow.field("value", pyarrow.float64()),
21 pyarrow.field("date", pyarrow.string()),
22 pyarrow.field("location", pyarrow.string()),
23 pyarrow.field("term", pyarrow.string()),
24 pyarrow.field("retrieved", pyarrow.string()),
25 ]
26)
27
28
29def write_to_dataset(data: DataFrame, data_dir: str = "data", defragment: bool = True):
30 """
31 Write term fragments to a Parquet dataset.
32
33 Args:
34 data (DataFrame): Collection results.
35 data_dir (str): Directory of the Parquet dataset.
36 defragment (bool): If `True`, defragments the dataset after writing new fragments.
37 """
38 for term, group in data.groupby("term"):
39 encoded_term = quote_plus(term)
40 part_dir = f"{data_dir}/term={encoded_term}/"
41 makedirs(part_dir, exist_ok=True)
42 pyarrow.parquet.write_table(
43 pyarrow.Table.from_pandas(group, schema=gtrends_schema),
44 f"{part_dir}fragment-{ceil(time())!s}-0.parquet",
45 compression="gzip",
46 )
47 if defragment: 47 ↛ 49line 47 didn't jump to line 49 because the condition on line 47 was always true
48 defragment_dataset(data_dir)
49 update_status(data_dir)
50
51
52def defragment_dataset(data_dir: str = "data"):
53 """
54 Defragments the dataset partitions.
55
56 Args:
57 data_dir (str): directory of the Parquet dataset.
58 """
59 for part_name in listdir(data_dir):
60 part_dir = f"{data_dir}/{part_name}/"
61 part = pyarrow.dataset.dataset(
62 part_dir, gtrends_schema, format="parquet", exclude_invalid_files=True
63 ).to_table()
64 pyarrow.parquet.write_table(part, f"{part_dir}part-0.parquet", compression="gzip")
65 for fragment in glob(f"{part_dir}fragment*.parquet"):
66 unlink(fragment)
67
68
69def update_status(data_dir: str, log_file: Union[str, None] = None):
70 """
71 Records state of data files.
72
73 Args:
74 data_dir (str): directory of a Parquet dataset.
75 log_file (str): path to the log file.
76 """
77 if log_file is None: 77 ↛ 79line 77 didn't jump to line 79 because the condition on line 77 was always true
78 log_file = dirname(abspath(data_dir)) + "/status.json"
79 print(log_file)
80 files = glob(f"{data_dir}/**/*.parquet")
81 if isfile(log_file): 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true
82 with open(log_file, "r", encoding="utf-8") as file:
83 state = json.load(file)
84 else:
85 state: Dict[str, str] = {}
86 for file in files:
87 with open(file, "rb") as file_buffer:
88 content = file_buffer.read()
89 state[file.replace("\\", "/")] = hashlib.md5(content).hexdigest()
90 with open(log_file, "w", encoding="utf-8") as file:
91 json.dump(state, file, indent=2)