Coverage for src\gtrends_collection\collector.py: 81%

68 statements  

« prev     ^ index     » next       coverage.py v7.9.0, created at 2025-06-12 05:39 -0400

1"""Collect Google Trends health data.""" 

2 

3import datetime 

4from os import getenv 

5from os.path import isfile 

6from time import sleep 

7from typing import ClassVar, Dict, List, Union 

8 

9from apiclient import discovery, errors 

10from pandas import DataFrame, concat, json_normalize, to_datetime 

11 

12from gtrends_collection.utils import read_scope 

13 

14 

15class Collector: 

16 """ 

17 Collect internet search volumes from the Google Trends timeline for health endpoint. 

18 

19 See the [schema](https://trends.googleapis.com/$discovery/rest?version=v1beta) 

20 for more about the API. Only the `getTimelinesForHealth` endpoint is used here. 

21 

22 Args: 

23 scope_dir (str): Directory containing the `terms.txt` and `locations.txt` files. 

24 See Specification. 

25 key_dir (str): Directory containing a `.env` file, to extract the 

26 `GOOGLE_API_KEY` variable from, if it is not already in the environment. 

27 terms_per_batch (int): Maximum terms to include in each collection batch. 

28 Theoretically 30 is the API's max, but more than 1 seems to not work. 

29 wait_time (float): Seconds to wait between each batch. 

30 version (str): Version of the service API. 

31 

32 Specification: 

33 To process in batches, search terms and locations must be specified in separate 

34 files (`terms.txt` and `locations.txt`), stored in the `scope_dir` directory. 

35 These should contain 1 term / location code per line. 

36 

37 Collection Process: 

38 Initializing this class retrieves the Google API service, stores the 

39 developer key, and points to the scope directory. 

40 

41 The `process_batches()` method reads in the terms and locations, 

42 and collects them in batches over the specified time frame. 

43 

44 Results from each batch are stored in the `batches` property, 

45 which can be pulled from in case the `process_batches` process does not complete 

46 (such as if the daily rate limit is reached). 

47 

48 The `collect()` method collects a single batch, and 

49 can be used on its own. 

50 

51 Examples: 

52 ```python 

53 from gtrends_collection import Collector 

54 

55 # initialize the collector 

56 collector = Collector() 

57 ``` 

58 """ 

59 

60 # time to wait between requests 

61 _regular_wait_time = 0.1 

62 # time to wait after a `rateLimitExceeded` error 

63 _fallback_wait_time = 2 

64 batches: ClassVar[List[DataFrame]] = [] 

65 

66 scope_dir = "scope" 

67 max_terms = 1 

68 

69 def __init__( 

70 self, 

71 scope_dir: str = "scope", 

72 key_dir: str = ".", 

73 terms_per_batch: int = 1, 

74 wait_time: float = 0.1, 

75 version: str = "v1beta", 

76 ): 

77 self._regular_wait_time = wait_time 

78 self.scope_dir = scope_dir 

79 self.max_terms = terms_per_batch 

80 

81 key = getenv("GOOGLE_API_KEY") 

82 if not key and isfile(f"{key_dir}/.env"): 82 ↛ 89line 82 didn't jump to line 89 because the condition on line 82 was always true

83 with open(f"{key_dir}/.env", encoding="utf-8") as content: 

84 for pair in content.read().split("\n"): 84 ↛ 89line 84 didn't jump to line 89

85 name, value = pair.split("=") 

86 if name.startswith("GOOGLE_API_KEY"): 86 ↛ 84line 86 didn't jump to line 84 because the condition on line 86 was always true

87 key = value.strip() 

88 break 

89 if not key: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true

90 msg = "no API key found (GOOGLE_API_KEY environment variable)" 

91 raise RuntimeError(msg) 

92 

93 self.service = discovery.build( 

94 "trends", 

95 version, 

96 discoveryServiceUrl=f"https://trends.googleapis.com/$discovery/rest?version={version}", 

97 developerKey=key, 

98 ) 

99 

100 def process_batches( 

101 self, 

102 start: Union[str, None] = None, 

103 end: Union[str, None] = None, 

104 resolution: str = "week", 

105 override_terms: Union[List[str], None] = None, 

106 override_location: Union[List[str], None] = None, 

107 ) -> DataFrame: 

108 """ 

109 Processes collection batches from scope. 

110 

111 Args: 

112 start (str | None): First date to collect from; `YYYY-MM-DD`. 

113 end (str | None): Last date to collect from; `YYYY-MM-DD`. 

114 resolution (str): Collection resolution; `day`, `week`, `month`, or `year`. 

115 override_terms (str): List of terms to collect instead of those in scope. 

116 Useful for changing collection order or filling out select terms. 

117 override_location (str): List of locations to collect from instead of those in scope. 

118 

119 Examples: 

120 ```python 

121 # collect across all scope-defined terms and locations in 2024 

122 data = collector.process_batches("2024-01-01", "2024-12-31") 

123 ``` 

124 

125 Returns: 

126 A `pandas.DataFrame` of the combined results. 

127 """ 

128 

129 params: Dict[str, Union[List[str], str]] = {"timelineResolution": resolution} 

130 if start: 130 ↛ 131line 130 didn't jump to line 131 because the condition on line 130 was never true

131 params["time_startDate"] = start 

132 if end: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true

133 params["time_endDate"] = end 

134 

135 terms = override_terms if override_terms else read_scope(self.scope_dir, "terms") 

136 locations = override_location if override_location else read_scope(self.scope_dir, "locations") 

137 locations = {loc if len(loc) < 9 else loc.split("-")[2] for loc in locations} 

138 

139 for term_set in range(0, len(terms), self.max_terms): 

140 for location in locations: 

141 batch_params = { 

142 "terms": terms[term_set : (term_set + self.max_terms)], 

143 **params, 

144 } 

145 batch_params[_location_type(location)] = location 

146 batch = self.collect(location, batch_params) 

147 self.batches.append(batch) 

148 sleep(self._regular_wait_time) 

149 

150 data = concat(self.batches) 

151 return data 

152 

153 def collect( 

154 self, 

155 location: str, 

156 params: Dict[str, Union[List[str], str]], 

157 ) -> DataFrame: 

158 """ 

159 Collect a single batch. 

160 

161 Args: 

162 location (str): Country (e.g., `US`), region (state; e.g., `US-AL`), 

163 or DMA (metro area; e.g., `US-AL-630` or `630`) code. 

164 params (dict[str, list[str] | str]): A dictionary with the following entries: 

165 

166 * `terms` (list[str]): List of terms to collect. 

167 * `timelineResolution` (str): Collection resolution; `day`, `week`, `month`, or `year`. 

168 * `time_startDate` (str): First date to collect from; `YYYY-MM-DD`. 

169 * `time_endDate` (str): First date to collect from; `YYYY-MM-DD`. 

170 

171 Examples: 

172 ```python 

173 # collect a small, custom sample 

174 data = collector.collect( 

175 "US-NY", 

176 { 

177 "terms": ["cough", "/m/01b_21"], 

178 "timelineResolution": "month", 

179 "time_startDate": "2014-01-01", 

180 "time_endDate": "2024-01-01", 

181 }, 

182 ) 

183 ``` 

184 

185 Returns: 

186 A `pandas.DataFrame` of the prepared results, with these columns: 

187 

188 * `value`: Number indicating search volume. 

189 * `date`: Date the searches were recorded on. 

190 * `location`: Location code in which searches were recorded from. 

191 * `term`: The search term. 

192 * `retrieved`: Date retrived from the API. 

193 """ 

194 

195 try: 

196 # pylint: disable=E1101 

197 response = self.service.getTimelinesForHealth(**params).execute() 

198 except errors.HttpError as e: 

199 if e.status_code == 429: 

200 sleep(self._fallback_wait_time) 

201 return self.collect(location, params) 

202 raise e 

203 today = (datetime.datetime.now(datetime.timezone.utc)).strftime("%Y-%m-%d") 

204 data = [] 

205 for line in response["lines"]: 

206 points = json_normalize(line["points"]) 

207 points["date"] = to_datetime(points["date"], format="mixed").dt.strftime("%Y-%m-%d") 

208 points["location"] = location 

209 points["term"] = line["term"] 

210 points["retrieved"] = today 

211 data.append(points) 

212 return concat(data) 

213 

214 

215def _location_type(location: str): 

216 return "geoRestriction_" + ({2: "country", 5: "region", 3: "dma"}[len(location)])