1"""Collect Google Trends health data."""
2
3import datetime
4from os import getenv
5from os.path import isfile
6from time import sleep
7from typing import ClassVar, Dict, List, Union
8
9from apiclient import discovery, errors
10from pandas import DataFrame, concat, json_normalize, to_datetime
11
12from gtrends_collection.utils import read_scope
13
14
15class Collector:
16 """
17 Collect internet search volumes from the Google Trends timeline for health endpoint.
18
19 See the [schema](https://trends.googleapis.com/$discovery/rest?version=v1beta)
20 for more about the API. Only the `getTimelinesForHealth` endpoint is used here.
21
22 Args:
23 scope_dir (str): Directory containing the `terms.txt` and `locations.txt` files.
24 See Specification.
25 key_dir (str): Directory containing a `.env` file, to extract the
26 `GOOGLE_API_KEY` variable from, if it is not already in the environment.
27 terms_per_batch (int): Maximum terms to include in each collection batch.
28 Theoretically 30 is the API's max, but more than 1 seems to not work.
29 wait_time (float): Seconds to wait between each batch.
30 version (str): Version of the service API.
31
32 Specification:
33 To process in batches, search terms and locations must be specified in separate
34 files (`terms.txt` and `locations.txt`), stored in the `scope_dir` directory.
35 These should contain 1 term / location code per line.
36
37 Collection Process:
38 Initializing this class retrieves the Google API service, stores the
39 developer key, and points to the scope directory.
40
41 The `process_batches()` method reads in the terms and locations,
42 and collects them in batches over the specified time frame.
43
44 Results from each batch are stored in the `batches` property,
45 which can be pulled from in case the `process_batches` process does not complete
46 (such as if the daily rate limit is reached).
47
48 The `collect()` method collects a single batch, and
49 can be used on its own.
50
51 Examples:
52 ```python
53 from gtrends_collection import Collector
54
55 # initialize the collector
56 collector = Collector()
57 ```
58 """
59
60 # time to wait between requests
61 _regular_wait_time = 0.1
62 # time to wait after a `rateLimitExceeded` error
63 _fallback_wait_time = 2
64 batches: ClassVar[List[DataFrame]] = []
65
66 scope_dir = "scope"
67 max_terms = 1
68
69 def __init__(
70 self,
71 scope_dir: str = "scope",
72 key_dir: str = ".",
73 terms_per_batch: int = 1,
74 wait_time: float = 0.1,
75 version: str = "v1beta",
76 ):
77 self._regular_wait_time = wait_time
78 self.scope_dir = scope_dir
79 self.max_terms = terms_per_batch
80
81 key = getenv("GOOGLE_API_KEY")
82 if not key and isfile(f"{key_dir}/.env"): 82 ↛ 89line 82 didn't jump to line 89 because the condition on line 82 was always true
83 with open(f"{key_dir}/.env", encoding="utf-8") as content:
84 for pair in content.read().split("\n"): 84 ↛ 89line 84 didn't jump to line 89
85 name, value = pair.split("=")
86 if name.startswith("GOOGLE_API_KEY"): 86 ↛ 84line 86 didn't jump to line 84 because the condition on line 86 was always true
87 key = value.strip()
88 break
89 if not key: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 msg = "no API key found (GOOGLE_API_KEY environment variable)"
91 raise RuntimeError(msg)
92
93 self.service = discovery.build(
94 "trends",
95 version,
96 discoveryServiceUrl=f"https://trends.googleapis.com/$discovery/rest?version={version}",
97 developerKey=key,
98 )
99
100 def process_batches(
101 self,
102 start: Union[str, None] = None,
103 end: Union[str, None] = None,
104 resolution: str = "week",
105 override_terms: Union[List[str], None] = None,
106 override_location: Union[List[str], None] = None,
107 ) -> DataFrame:
108 """
109 Processes collection batches from scope.
110
111 Args:
112 start (str | None): First date to collect from; `YYYY-MM-DD`.
113 end (str | None): Last date to collect from; `YYYY-MM-DD`.
114 resolution (str): Collection resolution; `day`, `week`, `month`, or `year`.
115 override_terms (str): List of terms to collect instead of those in scope.
116 Useful for changing collection order or filling out select terms.
117 override_location (str): List of locations to collect from instead of those in scope.
118
119 Examples:
120 ```python
121 # collect across all scope-defined terms and locations in 2024
122 data = collector.process_batches("2024-01-01", "2024-12-31")
123 ```
124
125 Returns:
126 A `pandas.DataFrame` of the combined results.
127 """
128
129 params: Dict[str, Union[List[str], str]] = {"timelineResolution": resolution}
130 if start: 130 ↛ 131line 130 didn't jump to line 131 because the condition on line 130 was never true
131 params["time_startDate"] = start
132 if end: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true
133 params["time_endDate"] = end
134
135 terms = override_terms if override_terms else read_scope(self.scope_dir, "terms")
136 locations = override_location if override_location else read_scope(self.scope_dir, "locations")
137 locations = {loc if len(loc) < 9 else loc.split("-")[2] for loc in locations}
138
139 for term_set in range(0, len(terms), self.max_terms):
140 for location in locations:
141 batch_params = {
142 "terms": terms[term_set : (term_set + self.max_terms)],
143 **params,
144 }
145 batch_params[_location_type(location)] = location
146 batch = self.collect(location, batch_params)
147 self.batches.append(batch)
148 sleep(self._regular_wait_time)
149
150 data = concat(self.batches)
151 return data
152
153 def collect(
154 self,
155 location: str,
156 params: Dict[str, Union[List[str], str]],
157 ) -> DataFrame:
158 """
159 Collect a single batch.
160
161 Args:
162 location (str): Country (e.g., `US`), region (state; e.g., `US-AL`),
163 or DMA (metro area; e.g., `US-AL-630` or `630`) code.
164 params (dict[str, list[str] | str]): A dictionary with the following entries:
165
166 * `terms` (list[str]): List of terms to collect.
167 * `timelineResolution` (str): Collection resolution; `day`, `week`, `month`, or `year`.
168 * `time_startDate` (str): First date to collect from; `YYYY-MM-DD`.
169 * `time_endDate` (str): First date to collect from; `YYYY-MM-DD`.
170
171 Examples:
172 ```python
173 # collect a small, custom sample
174 data = collector.collect(
175 "US-NY",
176 {
177 "terms": ["cough", "/m/01b_21"],
178 "timelineResolution": "month",
179 "time_startDate": "2014-01-01",
180 "time_endDate": "2024-01-01",
181 },
182 )
183 ```
184
185 Returns:
186 A `pandas.DataFrame` of the prepared results, with these columns:
187
188 * `value`: Number indicating search volume.
189 * `date`: Date the searches were recorded on.
190 * `location`: Location code in which searches were recorded from.
191 * `term`: The search term.
192 * `retrieved`: Date retrived from the API.
193 """
194
195 try:
196 # pylint: disable=E1101
197 response = self.service.getTimelinesForHealth(**params).execute()
198 except errors.HttpError as e:
199 if e.status_code == 429:
200 sleep(self._fallback_wait_time)
201 return self.collect(location, params)
202 raise e
203 today = (datetime.datetime.now(datetime.timezone.utc)).strftime("%Y-%m-%d")
204 data = []
205 for line in response["lines"]:
206 points = json_normalize(line["points"])
207 points["date"] = to_datetime(points["date"], format="mixed").dt.strftime("%Y-%m-%d")
208 points["location"] = location
209 points["term"] = line["term"]
210 points["retrieved"] = today
211 data.append(points)
212 return concat(data)
213
214
215def _location_type(location: str):
216 return "geoRestriction_" + ({2: "country", 5: "region", 3: "dma"}[len(location)])