Coverage for C: \ Users \ Admin \ AppData \ Local \ hatch \ env \ virtual \ gtrends-collection \ QF5Wa-es \ gtrends-collection \ Lib \ site-packages \ gtrends_collection \ dataset.py: 92%

45 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-05 04:45 -0500

1"""Write and manage trends dataset.""" 

2 

3import hashlib 

4import json 

5from glob import glob 

6from math import ceil 

7from os import listdir, makedirs, unlink 

8from os.path import abspath, dirname, isfile 

9from time import time 

10from urllib.parse import quote_plus 

11from typing import Dict, Union 

12 

13import pyarrow 

14import pyarrow.dataset 

15import pyarrow.parquet 

16from pandas import DataFrame 

17 

18gtrends_schema = pyarrow.schema( 

19 [ 

20 pyarrow.field("value", pyarrow.float64()), 

21 pyarrow.field("date", pyarrow.string()), 

22 pyarrow.field("location", pyarrow.string()), 

23 pyarrow.field("term", pyarrow.string()), 

24 pyarrow.field("retrieved", pyarrow.string()), 

25 ] 

26) 

27 

28 

29def write_to_dataset(data: DataFrame, data_dir: str = "data", defragment: bool = True): 

30 """ 

31 Write term fragments to a Parquet dataset. 

32 

33 Args: 

34 data (DataFrame): Collection results. 

35 data_dir (str): Directory of the Parquet dataset. 

36 defragment (bool): If `True`, defragments the dataset after writing new fragments. 

37 """ 

38 for term, group in data.groupby("term"): 

39 encoded_term = quote_plus(term) 

40 part_dir = f"{data_dir}/term={encoded_term}/" 

41 makedirs(part_dir, exist_ok=True) 

42 pyarrow.parquet.write_table( 

43 pyarrow.Table.from_pandas(group, schema=gtrends_schema), 

44 f"{part_dir}fragment-{ceil(time())!s}-0.parquet", 

45 compression="gzip", 

46 ) 

47 if defragment: 47 ↛ 49line 47 didn't jump to line 49 because the condition on line 47 was always true

48 defragment_dataset(data_dir) 

49 update_status(data_dir) 

50 

51 

52def defragment_dataset(data_dir: str = "data"): 

53 """ 

54 Defragments the dataset partitions. 

55 

56 Args: 

57 data_dir (str): directory of the Parquet dataset. 

58 """ 

59 for part_name in listdir(data_dir): 

60 part_dir = f"{data_dir}/{part_name}/" 

61 part = pyarrow.dataset.dataset( 

62 part_dir, gtrends_schema, format="parquet", exclude_invalid_files=True 

63 ).to_table() 

64 pyarrow.parquet.write_table(part, f"{part_dir}part-0.parquet", compression="gzip") 

65 for fragment in glob(f"{part_dir}fragment*.parquet"): 

66 unlink(fragment) 

67 

68 

69def update_status(data_dir: str, log_file: Union[str, None] = None): 

70 """ 

71 Records state of data files. 

72 

73 Args: 

74 data_dir (str): directory of a Parquet dataset. 

75 log_file (str): path to the log file. 

76 """ 

77 if log_file is None: 77 ↛ 79line 77 didn't jump to line 79 because the condition on line 77 was always true

78 log_file = dirname(abspath(data_dir)) + "/status.json" 

79 print(log_file) 

80 files = glob(f"{data_dir}/**/*.parquet") 

81 if isfile(log_file): 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true

82 with open(log_file, "r", encoding="utf-8") as file: 

83 state = json.load(file) 

84 else: 

85 state: Dict[str, str] = {} 

86 for file in files: 

87 with open(file, "rb") as file_buffer: 

88 content = file_buffer.read() 

89 state[file.replace("\\", "/")] = hashlib.md5(content).hexdigest() 

90 with open(log_file, "w", encoding="utf-8") as file: 

91 json.dump(state, file, indent=2)