Coverage for src\gtrends_collection\dataset.py: 97%

25 statements  

« prev     ^ index     » next       coverage.py v7.9.0, created at 2025-06-12 05:25 -0400

1"""Write and manage trends dataset.""" 

2 

3from glob import glob 

4from math import ceil 

5from os import listdir, makedirs, unlink 

6from time import time 

7from urllib.parse import quote_plus 

8 

9import pyarrow 

10import pyarrow.dataset 

11import pyarrow.parquet 

12from pandas import DataFrame 

13 

14gtrends_schema = pyarrow.schema( 

15 [ 

16 pyarrow.field("value", pyarrow.float64()), 

17 pyarrow.field("date", pyarrow.string()), 

18 pyarrow.field("location", pyarrow.string()), 

19 pyarrow.field("term", pyarrow.string()), 

20 pyarrow.field("retrieved", pyarrow.string()), 

21 ] 

22) 

23 

24 

25def write_to_dataset(data: DataFrame, data_dir: str = "data", defragment: bool = True): 

26 """ 

27 Write term fragments to a Parquet dataset. 

28 

29 Args: 

30 data (DataFrame): Collection results. 

31 data_dir (str): Directory of the Parquet dataset. 

32 defragment (bool): If `True`, defragments the dataset after writing new fragments. 

33 """ 

34 for term, group in data.groupby("term"): 

35 encoded_term = quote_plus(term) 

36 part_dir = f"{data_dir}/term={encoded_term}/" 

37 makedirs(part_dir, exist_ok=True) 

38 pyarrow.parquet.write_table( 

39 pyarrow.Table.from_pandas(group, schema=gtrends_schema), 

40 f"{part_dir}fragment-{ceil(time())!s}-0.parquet", 

41 compression="gzip", 

42 ) 

43 if defragment: 43 ↛ exitline 43 didn't return from function 'write_to_dataset' because the condition on line 43 was always true

44 defragment_dataset(data_dir) 

45 

46 

47def defragment_dataset(data_dir: str = "data"): 

48 """ 

49 Defragments the dataset partitions. 

50 

51 Args: 

52 data_dir (str): directory of the Parquet dataset. 

53 """ 

54 for part_name in listdir(data_dir): 

55 part_dir = f"{data_dir}/{part_name}/" 

56 part = pyarrow.dataset.dataset( 

57 part_dir, gtrends_schema, format="parquet", exclude_invalid_files=True 

58 ).to_table() 

59 pyarrow.parquet.write_table(part, f"{part_dir}part-0.parquet", compression="gzip") 

60 for fragment in glob(f"{part_dir}fragment*.parquet"): 

61 unlink(fragment)