Coverage for src\gtrends_collection\collector.py: 81%
68 statements
« prev ^ index » next coverage.py v7.9.0, created at 2025-06-12 05:39 -0400
« prev ^ index » next coverage.py v7.9.0, created at 2025-06-12 05:39 -0400
1"""Collect Google Trends health data."""
3import datetime
4from os import getenv
5from os.path import isfile
6from time import sleep
7from typing import ClassVar, Dict, List, Union
9from apiclient import discovery, errors
10from pandas import DataFrame, concat, json_normalize, to_datetime
12from gtrends_collection.utils import read_scope
15class Collector:
16 """
17 Collect internet search volumes from the Google Trends timeline for health endpoint.
19 See the [schema](https://trends.googleapis.com/$discovery/rest?version=v1beta)
20 for more about the API. Only the `getTimelinesForHealth` endpoint is used here.
22 Args:
23 scope_dir (str): Directory containing the `terms.txt` and `locations.txt` files.
24 See Specification.
25 key_dir (str): Directory containing a `.env` file, to extract the
26 `GOOGLE_API_KEY` variable from, if it is not already in the environment.
27 terms_per_batch (int): Maximum terms to include in each collection batch.
28 Theoretically 30 is the API's max, but more than 1 seems to not work.
29 wait_time (float): Seconds to wait between each batch.
30 version (str): Version of the service API.
32 Specification:
33 To process in batches, search terms and locations must be specified in separate
34 files (`terms.txt` and `locations.txt`), stored in the `scope_dir` directory.
35 These should contain 1 term / location code per line.
37 Collection Process:
38 Initializing this class retrieves the Google API service, stores the
39 developer key, and points to the scope directory.
41 The `process_batches()` method reads in the terms and locations,
42 and collects them in batches over the specified time frame.
44 Results from each batch are stored in the `batches` property,
45 which can be pulled from in case the `process_batches` process does not complete
46 (such as if the daily rate limit is reached).
48 The `collect()` method collects a single batch, and
49 can be used on its own.
51 Examples:
52 ```python
53 from gtrends_collection import Collector
55 # initialize the collector
56 collector = Collector()
57 ```
58 """
60 # time to wait between requests
61 _regular_wait_time = 0.1
62 # time to wait after a `rateLimitExceeded` error
63 _fallback_wait_time = 2
64 batches: ClassVar[List[DataFrame]] = []
66 scope_dir = "scope"
67 max_terms = 1
69 def __init__(
70 self,
71 scope_dir: str = "scope",
72 key_dir: str = ".",
73 terms_per_batch: int = 1,
74 wait_time: float = 0.1,
75 version: str = "v1beta",
76 ):
77 self._regular_wait_time = wait_time
78 self.scope_dir = scope_dir
79 self.max_terms = terms_per_batch
81 key = getenv("GOOGLE_API_KEY")
82 if not key and isfile(f"{key_dir}/.env"): 82 ↛ 89line 82 didn't jump to line 89 because the condition on line 82 was always true
83 with open(f"{key_dir}/.env", encoding="utf-8") as content:
84 for pair in content.read().split("\n"): 84 ↛ 89line 84 didn't jump to line 89
85 name, value = pair.split("=")
86 if name.startswith("GOOGLE_API_KEY"): 86 ↛ 84line 86 didn't jump to line 84 because the condition on line 86 was always true
87 key = value.strip()
88 break
89 if not key: 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true
90 msg = "no API key found (GOOGLE_API_KEY environment variable)"
91 raise RuntimeError(msg)
93 self.service = discovery.build(
94 "trends",
95 version,
96 discoveryServiceUrl=f"https://trends.googleapis.com/$discovery/rest?version={version}",
97 developerKey=key,
98 )
100 def process_batches(
101 self,
102 start: Union[str, None] = None,
103 end: Union[str, None] = None,
104 resolution: str = "week",
105 override_terms: Union[List[str], None] = None,
106 override_location: Union[List[str], None] = None,
107 ) -> DataFrame:
108 """
109 Processes collection batches from scope.
111 Args:
112 start (str | None): First date to collect from; `YYYY-MM-DD`.
113 end (str | None): Last date to collect from; `YYYY-MM-DD`.
114 resolution (str): Collection resolution; `day`, `week`, `month`, or `year`.
115 override_terms (str): List of terms to collect instead of those in scope.
116 Useful for changing collection order or filling out select terms.
117 override_location (str): List of locations to collect from instead of those in scope.
119 Examples:
120 ```python
121 # collect across all scope-defined terms and locations in 2024
122 data = collector.process_batches("2024-01-01", "2024-12-31")
123 ```
125 Returns:
126 A `pandas.DataFrame` of the combined results.
127 """
129 params: Dict[str, Union[List[str], str]] = {"timelineResolution": resolution}
130 if start: 130 ↛ 131line 130 didn't jump to line 131 because the condition on line 130 was never true
131 params["time_startDate"] = start
132 if end: 132 ↛ 133line 132 didn't jump to line 133 because the condition on line 132 was never true
133 params["time_endDate"] = end
135 terms = override_terms if override_terms else read_scope(self.scope_dir, "terms")
136 locations = override_location if override_location else read_scope(self.scope_dir, "locations")
137 locations = {loc if len(loc) < 9 else loc.split("-")[2] for loc in locations}
139 for term_set in range(0, len(terms), self.max_terms):
140 for location in locations:
141 batch_params = {
142 "terms": terms[term_set : (term_set + self.max_terms)],
143 **params,
144 }
145 batch_params[_location_type(location)] = location
146 batch = self.collect(location, batch_params)
147 self.batches.append(batch)
148 sleep(self._regular_wait_time)
150 data = concat(self.batches)
151 return data
153 def collect(
154 self,
155 location: str,
156 params: Dict[str, Union[List[str], str]],
157 ) -> DataFrame:
158 """
159 Collect a single batch.
161 Args:
162 location (str): Country (e.g., `US`), region (state; e.g., `US-AL`),
163 or DMA (metro area; e.g., `US-AL-630` or `630`) code.
164 params (dict[str, list[str] | str]): A dictionary with the following entries:
166 * `terms` (list[str]): List of terms to collect.
167 * `timelineResolution` (str): Collection resolution; `day`, `week`, `month`, or `year`.
168 * `time_startDate` (str): First date to collect from; `YYYY-MM-DD`.
169 * `time_endDate` (str): First date to collect from; `YYYY-MM-DD`.
171 Examples:
172 ```python
173 # collect a small, custom sample
174 data = collector.collect(
175 "US-NY",
176 {
177 "terms": ["cough", "/m/01b_21"],
178 "timelineResolution": "month",
179 "time_startDate": "2014-01-01",
180 "time_endDate": "2024-01-01",
181 },
182 )
183 ```
185 Returns:
186 A `pandas.DataFrame` of the prepared results, with these columns:
188 * `value`: Number indicating search volume.
189 * `date`: Date the searches were recorded on.
190 * `location`: Location code in which searches were recorded from.
191 * `term`: The search term.
192 * `retrieved`: Date retrived from the API.
193 """
195 try:
196 # pylint: disable=E1101
197 response = self.service.getTimelinesForHealth(**params).execute()
198 except errors.HttpError as e:
199 if e.status_code == 429:
200 sleep(self._fallback_wait_time)
201 return self.collect(location, params)
202 raise e
203 today = (datetime.datetime.now(datetime.timezone.utc)).strftime("%Y-%m-%d")
204 data = []
205 for line in response["lines"]:
206 points = json_normalize(line["points"])
207 points["date"] = to_datetime(points["date"], format="mixed").dt.strftime("%Y-%m-%d")
208 points["location"] = location
209 points["term"] = line["term"]
210 points["retrieved"] = today
211 data.append(points)
212 return concat(data)
215def _location_type(location: str):
216 return "geoRestriction_" + ({2: "country", 5: "region", 3: "dma"}[len(location)])