1 |
#' Download Census Population Data |
|
2 |
#' |
|
3 |
#' Download American Community Survey population data from the U.S. Census Bureau. |
|
4 |
#' |
|
5 |
#' @param year Data year. |
|
6 |
#' @param out_dir Directory to download the file to. |
|
7 |
#' @param state_only Logical; if \code{TRUE}, will only load state data. |
|
8 |
#' Will still download county data. |
|
9 |
#' @param overwrite Logical; if \code{TRUE}, will re-download and overwrite existing data. |
|
10 |
#' @param verbose Logical; if \code{FALSE}, will not display status messages. |
|
11 |
#' @returns A \code{data.frame} including \code{GEOID} and \code{region_name} |
|
12 |
#' for states and counties, along with their population, in total and within |
|
13 |
#' age brackets. |
|
14 |
#' @examples |
|
15 |
#' if (file.exists("../../resources/census_population_2021.csv.xz")) { |
|
16 |
#' pophive_load_census(2021, "../../resources")[1:10, ] |
|
17 |
#' } |
|
18 |
#' @export |
|
19 | ||
20 |
pophive_load_census <- function( |
|
21 |
year = 2021, |
|
22 |
out_dir = NULL, |
|
23 |
state_only = FALSE, |
|
24 |
overwrite = FALSE, |
|
25 |
verbose = TRUE |
|
26 |
) { |
|
27 | 2x |
out_file <- paste0(out_dir, "/census_population_", year, ".csv.xz") |
28 | 2x |
write_out <- !is.null(out_dir) |
29 | 2x |
if (!overwrite && write_out && file.exists(out_file)) { |
30 | 1x |
if (verbose) cli::cli_progress_step("reading in existing file") |
31 | 1x |
invisible(as.data.frame(vroom::vroom( |
32 | 1x |
out_file, |
33 | 1x |
delim = ",", |
34 | 1x |
col_types = list( |
35 | 1x |
GEOID = "c", |
36 | 1x |
region_name = "c", |
37 | 1x |
Total = "i", |
38 | 1x |
`<10 Years` = "i", |
39 | 1x |
`10-14 Years` = "i", |
40 | 1x |
`15-19 Years` = "i", |
41 | 1x |
`20-39 Years` = "i", |
42 | 1x |
`40-64 Years` = "i", |
43 | 1x |
`65+ Years` = "i" |
44 |
), |
|
45 | 1x |
n_max = if (state_only) 52L else Inf |
46 |
))) |
|
47 |
} else { |
|
48 |
# GEOID to region name mapping |
|
49 | 1x |
id_url <- "https://www2.census.gov/geo/docs/reference/codes2020/national_" |
50 | 1x |
if (verbose) cli::cli_progress_step("downloading state IDs map") |
51 | 1x |
state_ids <- vroom::vroom( |
52 | 1x |
paste0(id_url, "state2020.txt"), |
53 | 1x |
delim = "|", |
54 | 1x |
col_types = list( |
55 | 1x |
STATE = "c", |
56 | 1x |
STATEFP = "c", |
57 | 1x |
STATENS = "c", |
58 | 1x |
STATE_NAME = "c" |
59 |
) |
|
60 |
) |
|
61 | 1x |
if (verbose) cli::cli_progress_step("downloading county IDs map") |
62 | 1x |
county_ids <- vroom::vroom( |
63 | 1x |
paste0(id_url, "county2020.txt"), |
64 | 1x |
delim = "|", |
65 | 1x |
col_types = list( |
66 | 1x |
STATE = "c", |
67 | 1x |
STATEFP = "c", |
68 | 1x |
COUNTYFP = "c", |
69 | 1x |
COUNTYNS = "c", |
70 | 1x |
COUNTYNAME = "c", |
71 | 1x |
CLASSFP = "c", |
72 | 1x |
FUNCSTAT = "c" |
73 |
) |
|
74 |
) |
|
75 | 1x |
region_name = structure( |
76 | 1x |
sub( |
77 | 1x |
" County", |
78 |
"", |
|
79 | 1x |
c( |
80 | 1x |
state_ids$STATE_NAME, |
81 | 1x |
paste0(county_ids$COUNTYNAME, ", ", county_ids$STATE) |
82 |
), |
|
83 | 1x |
fixed = TRUE |
84 |
), |
|
85 | 1x |
names = c( |
86 | 1x |
state_ids$STATEFP, |
87 | 1x |
paste0(county_ids$STATEFP, county_ids$COUNTYFP) |
88 |
) |
|
89 |
) |
|
90 | ||
91 |
# population data |
|
92 | ||
93 |
## age group labels from IDs |
|
94 | 1x |
if (verbose) cli::cli_progress_step("downloading ACS variable lables") |
95 | 1x |
labels <- vroom::vroom( |
96 | 1x |
paste0( |
97 | 1x |
"https://www2.census.gov/programs-surveys/acs/summary_file/", |
98 | 1x |
min(2021L, year), |
99 | 1x |
"/sequence-based-SF/documentation/user_tools/ACS_5yr_Seq_Table_Number_Lookup.txt" |
100 |
), |
|
101 | 1x |
delim = ",", |
102 | 1x |
col_types = list( |
103 | 1x |
`File ID` = "c", |
104 | 1x |
`Table ID` = "c", |
105 | 1x |
`Sequence Number` = "c", |
106 | 1x |
`Line Number` = "d", |
107 | 1x |
`Start Position` = "i", |
108 | 1x |
`Total Cells in Table` = "c", |
109 | 1x |
`Total Cells in Sequence` = "i", |
110 | 1x |
`Table Title` = "c", |
111 | 1x |
`Subject Area` = "c" |
112 |
) |
|
113 |
) |
|
114 | 1x |
variable_labels <- structure( |
115 | 1x |
labels$`Table Title`, |
116 | 1x |
names = paste0( |
117 | 1x |
labels$`Table ID`, |
118 | 1x |
"_E", |
119 | 1x |
formatC(labels$`Line Number`, width = 3L, flag = 0L) |
120 |
) |
|
121 |
) |
|
122 | ||
123 |
## age group counts |
|
124 | 1x |
url <- paste0( |
125 | 1x |
"https://www2.census.gov/programs-surveys/acs/summary_file/", |
126 | 1x |
year, |
127 | 1x |
"/table-based-SF/data/5YRData/acsdt5y", |
128 | 1x |
year, |
129 | 1x |
"-b01001.dat" |
130 |
) |
|
131 | 1x |
if (verbose) cli::cli_progress_step("downloading population data") |
132 | 1x |
data <- vroom::vroom(url, delim = "|", col_types = list(GEO_ID = "c")) |
133 | 1x |
data <- data[ |
134 | 1x |
grep("0[45]00000US", data$GEO_ID), |
135 | 1x |
grep("E", colnames(data), fixed = TRUE) |
136 |
] |
|
137 | 1x |
colnames(data)[-1L] <- variable_labels[colnames(data)[-1L]] |
138 | ||
139 | 1x |
age_groups <- list( |
140 | 1x |
Total = "Total:", |
141 | 1x |
`<10 Years` = c("Under 5 years", "5 to 9 years"), |
142 | 1x |
`10-14 Years` = "10 to 14 years", |
143 | 1x |
`15-19 Years` = c("15 to 17 years", "18 and 19 years"), |
144 | 1x |
`20-39 Years` = c( |
145 | 1x |
"20 years", |
146 | 1x |
"21 years", |
147 | 1x |
"22 to 24 years", |
148 | 1x |
"25 to 29 years", |
149 | 1x |
"30 to 34 years", |
150 | 1x |
"35 to 39 years" |
151 |
), |
|
152 | 1x |
`40-64 Years` = c( |
153 | 1x |
"40 to 44 years", |
154 | 1x |
"45 to 49 years", |
155 | 1x |
"50 to 54 years", |
156 | 1x |
"55 to 59 years", |
157 | 1x |
"60 and 61 years", |
158 | 1x |
"62 to 64 years" |
159 |
), |
|
160 | 1x |
`65+ Years` = c( |
161 | 1x |
"65 and 66 years", |
162 | 1x |
"67 to 69 years", |
163 | 1x |
"70 to 74 years", |
164 | 1x |
"75 to 79 years", |
165 | 1x |
"80 to 84 years", |
166 | 1x |
"85 years and over" |
167 |
) |
|
168 |
) |
|
169 | 1x |
if (verbose) |
170 | 1x |
cli::cli_progress_step("agregating across sex and fine age groups") |
171 | 1x |
pop <- cbind( |
172 | 1x |
data.frame(GEOID = substring(data$GEO_ID, 10L), region_name = ""), |
173 | 1x |
do.call( |
174 | 1x |
cbind, |
175 | 1x |
lapply(age_groups, function(l) rowSums(data[, colnames(data) %in% l])) |
176 |
) |
|
177 |
) |
|
178 | 1x |
pop$region_name = region_name[pop$GEOID] |
179 | 1x |
states <- pop[1L:52L, ] |
180 | 1x |
health_regions <- as.data.frame(do.call( |
181 | 1x |
rbind, |
182 | 1x |
lapply( |
183 | 1x |
split( |
184 | 1x |
states[, -(1L:2L)], |
185 | 1x |
pophive_to_health_region(states$GEOID, "hhs_") |
186 |
), |
|
187 | 1x |
colSums |
188 |
) |
|
189 |
)) |
|
190 | 1x |
health_regions$GEOID <- rownames(health_regions) |
191 | 1x |
health_regions$region_name <- sub( |
192 | 1x |
"hhs_", |
193 | 1x |
"Health Region ", |
194 | 1x |
rownames(health_regions), |
195 | 1x |
fixed = TRUE |
196 |
) |
|
197 | 1x |
pop <- rbind(pop, health_regions[, colnames(pop)]) |
198 | ||
199 | 1x |
if (write_out) { |
200 | 1x |
if (verbose) cli::cli_progress_step("writing output") |
201 | 1x |
dir.create(out_dir, recursive = TRUE, showWarnings = FALSE) |
202 | 1x |
vroom::vroom_write(pop, out_file, ",") |
203 |
} |
|
204 | 1x |
invisible(if (state_only) states else pop) |
205 |
} |
|
206 |
} |
1 |
#' Download Data from the CDC |
|
2 |
#' |
|
3 |
#' Download data and metadata from the Centers for Disease Control and Prevention (CDC). |
|
4 |
#' |
|
5 |
#' @param id ID of the resource (e.g., \code{ijqb-a7ye}). |
|
6 |
#' @param out_dir Directory in which to save the metadata and data files. |
|
7 |
#' @param state The state ID of a previous download; if provided, will only download if the |
|
8 |
#' new state does not match. |
|
9 |
#' @param verbose Logical; if \code{FALSE}, will not display status messages. |
|
10 |
#' @returns The state ID of the downloaded files; |
|
11 |
#' downloads files (\code{<id>.json} and \code{<id>.csv.xz}) to \code{out_dir} |
|
12 |
#' @section \code{data.cdc.gov} URLs: |
|
13 |
#' |
|
14 |
#' For each resource ID, there are 3 relevant CDC URLs: |
|
15 |
#' \itemize{ |
|
16 |
#' \item \strong{\code{resource/<id>}}: This redirects to the resource's main page, |
|
17 |
#' with displayed metadata and a data preview |
|
18 |
#' (e.g., \href{https://data.cdc.gov/resource/ijqb-a7ye}{data.cdc.gov/resource/ijqb-a7ye}). |
|
19 |
#' \item \strong{\code{api/views/<id>}}: This is a direct link to the underlying |
|
20 |
#' JSON metadata (e.g., \href{https://data.cdc.gov/api/views/ijqb-a7ye}{data.cdc.gov/api/views/ijqb-a7ye}). |
|
21 |
#' \item \strong{\code{api/views/<id>/rows.csv}}: This is a direct link to the full |
|
22 |
#' CSV dataset (e.g., \href{https://data.cdc.gov/api/views/ijqb-a7ye/rows.csv}{data.cdc.gov/api/views/ijqb-a7ye/rows.csv}). |
|
23 |
#' } |
|
24 |
#' |
|
25 |
#' @examples |
|
26 |
#' \dontrun{ |
|
27 |
#' pophive_download_cdc("ijqb-a7ye") |
|
28 |
#' } |
|
29 |
#' @export |
|
30 | ||
31 |
pophive_download_cdc <- function( |
|
32 |
id, |
|
33 |
out_dir = "raw", |
|
34 |
state = NULL, |
|
35 |
verbose = TRUE |
|
36 |
) { |
|
37 | 1x |
dir.create(out_dir, showWarnings = FALSE, recursive = TRUE) |
38 | 1x |
if (verbose) { |
39 | 1x |
resource_url <- paste0("https://data.cdc.gov/resource/", id) |
40 | 1x |
cli::cli_h1( |
41 | 1x |
"downloading resource {.url {resource_url}}" |
42 |
) |
|
43 |
} |
|
44 | 1x |
url <- paste0("https://data.cdc.gov/api/views/", id) |
45 | 1x |
initial_timeout <- options(timeout = 99999)$timeout |
46 | 1x |
on.exit(options(timeout = initial_timeout)) |
47 | 1x |
if (verbose) cli::cli_progress_step("metadata: {.url {url}}") |
48 | 1x |
metadata_file <- paste0(out_dir, "/", id, ".json") |
49 | 1x |
status <- utils::download.file(url, metadata_file, quiet = TRUE) |
50 | ! |
if (status != 0L) cli::cli_abort("failed to download metadata") |
51 | 1x |
new_state <- as.list(tools::md5sum(metadata_file)) |
52 | 1x |
if (!identical(new_state, state)) { |
53 | 1x |
data_url <- paste0(url, "/rows.csv") |
54 | 1x |
out_path <- paste0(out_dir, "/", id, ".csv") |
55 | 1x |
if (verbose) cli::cli_progress_step("data: {.url {data_url}}") |
56 | 1x |
status <- utils::download.file(data_url, out_path, quiet = TRUE) |
57 | ! |
if (status != 0L) cli::cli_abort("failed to download data") |
58 | 1x |
if (verbose) cli::cli_progress_step("compressing data") |
59 | 1x |
unlink(paste0(out_path, ".xz")) |
60 | 1x |
status <- system2("xz", c("-f", out_path)) |
61 | ! |
if (status != 0L) cli::cli_abort("failed to compress data") |
62 | 1x |
if (verbose) cli::cli_progress_done() |
63 | 1x |
invisible(new_state) |
64 |
} else { |
|
65 | ! |
invisible(state) |
66 |
} |
|
67 |
} |
1 |
#' Check Data Sources |
|
2 |
#' |
|
3 |
#' Check the data files and measure info of source projects. |
|
4 |
#' |
|
5 |
#' @param names Name or names of source projects. |
|
6 |
#' @param source_dir Path to the directory containing the source projects. |
|
7 |
#' @param verbose Logical; if \code{FALSE}, will not print status messages. |
|
8 |
#' @returns A list with an entry for each source, containing a character vector |
|
9 |
#' including any issue codes: |
|
10 |
#' \itemize{ |
|
11 |
#' \item \code{not_compressed}: The file does not appear to be compressed. |
|
12 |
#' \item \code{cant_read}: Failed to read the file in. |
|
13 |
#' \item \code{geography_missing}: File does not contain a \code{geography} column. |
|
14 |
#' \item \code{geography_nas}: The file's \code{geography} column contains NAs. |
|
15 |
#' \item \code{time_missing}: File does not contain a \code{time} column. |
|
16 |
#' \item \code{time_nas}: The file's \code{time} column contains NAs. |
|
17 |
#' \item \code{missing_info: {column_name}}: The file's indicated column does not have |
|
18 |
#' a matching entry in \code{measure_info.json}. |
|
19 |
#' } |
|
20 |
#' @examples |
|
21 |
#' \dontrun{ |
|
22 |
#' pophive_check_sources("gtrends") |
|
23 |
#' } |
|
24 |
#' @export |
|
25 | ||
26 |
pophive_check_sources <- function( |
|
27 |
names = list.dirs("data", recursive = FALSE, full.names = FALSE), |
|
28 |
source_dir = "data", |
|
29 |
verbose = TRUE |
|
30 |
) { |
|
31 | 2x |
issues <- list() |
32 | 2x |
for (name in names) { |
33 | 2x |
base_dir <- paste0(source_dir, "/", name, "/") |
34 | 2x |
if (!dir.exists(base_dir)) |
35 | ! |
cli::cli_abort("specify the name of an existing data source") |
36 | 2x |
process_file <- paste0(base_dir, "process.json") |
37 | 2x |
pophive_add_source(name, source_dir, FALSE) |
38 | 2x |
if (!file.exists(process_file)) { |
39 | ! |
cli::cli_abort("{name} does not appear to be a data source project") |
40 |
} |
|
41 | 2x |
process <- jsonlite::read_json(process_file) |
42 | 2x |
info_file <- paste0(base_dir, "measure_info.json") |
43 | 2x |
info <- tryCatch( |
44 | 2x |
community::data_measure_info( |
45 | 2x |
info_file, |
46 | 2x |
render = TRUE, |
47 | 2x |
write = FALSE, |
48 | 2x |
verbose = FALSE, |
49 | 2x |
open_after = FALSE |
50 |
), |
|
51 | 2x |
error = function(e) NULL |
52 |
) |
|
53 | ! |
if (is.null(info)) cli::cli_abort("{.file {info_file}} is malformed") |
54 | 2x |
if (verbose) |
55 | 2x |
cli::cli_bullets(c("", "Checking data source {.strong {name}}")) |
56 | 2x |
data_files <- list.files( |
57 | 2x |
paste0(base_dir, "standard/"), |
58 | 2x |
"\\.(?:csv|parquet)", |
59 | 2x |
full.names = TRUE |
60 |
) |
|
61 | 2x |
source_issues <- list() |
62 | 2x |
for (file in list.files( |
63 | 2x |
paste0(base_dir, "raw/"), |
64 | 2x |
"csv$", |
65 | 2x |
full.names = TRUE |
66 |
)) { |
|
67 | 1x |
source_issues[[file]] <- list(data = "not_compressed") |
68 |
} |
|
69 | 2x |
if (length(data_files)) { |
70 | 2x |
for (file in data_files) { |
71 | 2x |
issue_messages <- NULL |
72 | 2x |
if (verbose) { |
73 | 2x |
cli::cli_progress_step("checking file {.file {file}}", spinner = TRUE) |
74 |
} |
|
75 | 2x |
data_issues <- NULL |
76 | 2x |
measure_issues <- NULL |
77 | 2x |
data <- tryCatch( |
78 | 2x |
if (grepl("parquet$", file)) |
79 | 2x |
dplyr::collect(arrow::read_parquet(file)) else { |
80 | 2x |
con <- gzfile(file) |
81 | 2x |
on.exit(con) |
82 | 2x |
vroom::vroom(con, show_col_types = FALSE) |
83 |
}, |
|
84 | 2x |
error = function(e) NULL |
85 |
) |
|
86 | 2x |
if (is.null(data)) { |
87 | ! |
data_issues <- c(data_issues, "cant_read") |
88 |
} else { |
|
89 | 2x |
if (grepl("csv$", file)) { |
90 | 1x |
data_issues <- c(data_issues, "not_compressed") |
91 | 1x |
if (verbose) |
92 | 1x |
issue_messages <- c( |
93 | 1x |
issue_messages, |
94 | 1x |
"file is not compressed" |
95 |
) |
|
96 |
} |
|
97 | 2x |
if (!("geography" %in% colnames(data))) { |
98 | ! |
data_issues <- c(data_issues, "geography_missing") |
99 | ! |
if (verbose) |
100 | ! |
issue_messages <- c( |
101 | ! |
issue_messages, |
102 | ! |
"missing {.emph geography} column" |
103 |
) |
|
104 | 2x |
} else if (anyNA(data$geography)) { |
105 | 1x |
data_issues <- c(data_issues, "geography_nas") |
106 | 1x |
if (verbose) |
107 | 1x |
issue_messages <- c( |
108 | 1x |
issue_messages, |
109 | 1x |
"{.emph geography} column contains NAs" |
110 |
) |
|
111 |
} |
|
112 | 2x |
if (!("time" %in% colnames(data))) { |
113 | ! |
data_issues <- c(data_issues, "time_missing") |
114 | ! |
if (verbose) |
115 | ! |
issue_messages <- c( |
116 | ! |
issue_messages, |
117 | ! |
"missing {.emph time} column" |
118 |
) |
|
119 | 2x |
} else if (anyNA(data$time)) { |
120 | 1x |
data_issues <- c(data_issues, "time_nas") |
121 | 1x |
if (verbose) |
122 | 1x |
issue_messages <- c( |
123 | 1x |
issue_messages, |
124 | 1x |
"{.emph time} column contains NAs" |
125 |
) |
|
126 |
} |
|
127 | 2x |
for (col in colnames(data)) { |
128 | 6x |
if (!(col %in% c("geography", "time")) && !(col %in% names(info))) { |
129 | 1x |
measure_issues <- c(measure_issues, paste("missing_info:", col)) |
130 | 1x |
if (verbose) |
131 | 1x |
issue_messages <- c( |
132 | 1x |
issue_messages, |
133 | 1x |
paste0( |
134 | 1x |
"{.emph ", |
135 | 1x |
col, |
136 | 1x |
"} column does not have an entry in measure_info" |
137 |
) |
|
138 |
) |
|
139 |
} |
|
140 |
} |
|
141 |
} |
|
142 | 2x |
file_issues <- list() |
143 | 1x |
if (length(data_issues)) file_issues$data <- data_issues |
144 | 1x |
if (length(measure_issues)) file_issues$measures <- measure_issues |
145 | 2x |
source_issues[[file]] <- file_issues |
146 | 2x |
if (verbose) { |
147 | 2x |
if (length(issue_messages)) { |
148 | 1x |
cli::cli_progress_done(result = "failed") |
149 | 1x |
cli::cli_bullets(structure( |
150 | 1x |
issue_messages, |
151 | 1x |
names = rep(" ", length(issue_messages)) |
152 |
)) |
|
153 |
} else { |
|
154 | 1x |
cli::cli_progress_done() |
155 |
} |
|
156 |
} |
|
157 |
} |
|
158 |
} else { |
|
159 | ! |
if (verbose) cli::cli_alert_info("no standard data files found to check") |
160 |
} |
|
161 | 2x |
process$checked <- Sys.time() |
162 | 2x |
process$check_results <- source_issues |
163 | 2x |
jsonlite::write_json( |
164 | 2x |
process, |
165 | 2x |
process_file, |
166 | 2x |
auto_unbox = TRUE, |
167 | 2x |
pretty = TRUE |
168 |
) |
|
169 | 2x |
issues[[name]] <- source_issues |
170 |
} |
|
171 | ||
172 | 2x |
invisible(issues) |
173 |
} |
1 |
#' Map States to Health Regions |
|
2 |
#' |
|
3 |
#' Maps state FIPS state numeric codes to Human Health Service regions. |
|
4 |
#' |
|
5 |
#' @param geoids Character vector of GEOIDs. |
|
6 |
#' @param prefix A prefix to add to region IDs. |
|
7 |
#' @returns A vector of Health Region names the same length as \code{geoids}. |
|
8 |
#' @examples |
|
9 |
#' pophive_to_health_region(c("01", "01001", "02", "02001")) |
|
10 |
#' @export |
|
11 | ||
12 |
pophive_to_health_region <- function(geoids, prefix = "Region ") { |
|
13 | 1x |
regions <- c( |
14 | 1x |
"01" = 4, |
15 | 1x |
"02" = 10, |
16 | 1x |
"04" = 9, |
17 | 1x |
"05" = 6, |
18 | 1x |
"06" = 9, |
19 | 1x |
"08" = 8, |
20 | 1x |
"09" = 1, |
21 | 1x |
"10" = 3, |
22 | 1x |
"11" = 3, |
23 | 1x |
"12" = 4, |
24 | 1x |
"13" = 4, |
25 | 1x |
"15" = 9, |
26 | 1x |
"16" = 10, |
27 | 1x |
"17" = 5, |
28 | 1x |
"18" = 5, |
29 | 1x |
"19" = 7, |
30 | 1x |
"20" = 7, |
31 | 1x |
"21" = 4, |
32 | 1x |
"22" = 6, |
33 | 1x |
"23" = 1, |
34 | 1x |
"24" = 3, |
35 | 1x |
"25" = 1, |
36 | 1x |
"26" = 5, |
37 | 1x |
"27" = 5, |
38 | 1x |
"28" = 4, |
39 | 1x |
"29" = 7, |
40 | 1x |
"30" = 8, |
41 | 1x |
"31" = 7, |
42 | 1x |
"32" = 9, |
43 | 1x |
"33" = 1, |
44 | 1x |
"34" = 2, |
45 | 1x |
"35" = 6, |
46 | 1x |
"36" = 2, |
47 | 1x |
"37" = 4, |
48 | 1x |
"38" = 8, |
49 | 1x |
"39" = 5, |
50 | 1x |
"40" = 6, |
51 | 1x |
"41" = 10, |
52 | 1x |
"42" = 3, |
53 | 1x |
"44" = 1, |
54 | 1x |
"45" = 4, |
55 | 1x |
"46" = 8, |
56 | 1x |
"47" = 4, |
57 | 1x |
"48" = 6, |
58 | 1x |
"49" = 8, |
59 | 1x |
"50" = 1, |
60 | 1x |
"51" = 3, |
61 | 1x |
"53" = 10, |
62 | 1x |
"54" = 3, |
63 | 1x |
"55" = 5, |
64 | 1x |
"56" = 8, |
65 | 1x |
"72" = 2, |
66 | 1x |
"66" = 6, |
67 | 1x |
"74" = 2 |
68 |
) |
|
69 | 1x |
regions[] <- paste0(prefix, regions) |
70 | 1x |
unname(regions[substring(geoids, 1L, 2L)]) |
71 |
} |
1 |
#' Process Epic Stating Files |
|
2 |
#' |
|
3 |
#' Process Epic stating files, lightly standardizing them and moving them to raw. |
|
4 |
#' |
|
5 |
#' @param staging_dir Directory containing the staging files. |
|
6 |
#' @param out_dir Directory to write new raw files to. |
|
7 |
#' @param verbose Logical; if \code{FALSE}, will not show status messages. |
|
8 |
#' @param cleanup Logical; if \code{FALSE}, will not remove staging files after being processed. |
|
9 |
#' @returns \code{NULL} if no staging files are found. |
|
10 |
#' Otherwise, a list with entries for \code{data} and \code{metadata}. |
|
11 |
#' Each of these are lists with entries for each recognized standard name, |
|
12 |
#' with potentially combined outputs similar to \code{\link{pophive_read_epic}} |
|
13 |
#' |
|
14 |
#' @examples |
|
15 |
#' \dontrun{ |
|
16 |
#' # run from a source project |
|
17 |
#' pophive_process_epic_staging() |
|
18 |
#' } |
|
19 |
#' |
|
20 |
#' @export |
|
21 | ||
22 |
pophive_process_epic_staging <- function( |
|
23 |
staging_dir = "raw/staging", |
|
24 |
out_dir = "raw", |
|
25 |
verbose = TRUE, |
|
26 |
cleanup = TRUE |
|
27 |
) { |
|
28 | 1x |
files <- sort(list.files( |
29 | 1x |
staging_dir, |
30 | 1x |
"csv", |
31 | 1x |
full.names = TRUE, |
32 | 1x |
recursive = TRUE |
33 |
)) |
|
34 | 1x |
files <- files[!grepl("census", files)] |
35 | 1x |
if (!length(files)) { |
36 | ! |
if (verbose) cli::cli_progress_message("no staging files found") |
37 | ! |
return(NULL) |
38 |
} |
|
39 | 1x |
id_cols <- c("state", "county", "age", "year", "month", "week") |
40 | 1x |
metadata <- list() |
41 | 1x |
data <- list() |
42 | 1x |
for (file in files) { |
43 | 2x |
if (verbose) |
44 | 2x |
cli::cli_progress_step("processing file {.file {file}}", spinner = TRUE) |
45 | 2x |
epic <- tryCatch(pophive_read_epic(file), error = function(e) NULL) |
46 | 2x |
if (is.null(epic)) { |
47 | ! |
if (verbose) cli::cli_progress_done(result = "failed") |
48 | ! |
next |
49 |
} |
|
50 | 2x |
if (epic$metadata$standard_name == "") { |
51 | ! |
if (verbose) { |
52 | ! |
cli::cli_progress_update( |
53 | ! |
status = "failed to identify standard type for {.file {file}}" |
54 |
) |
|
55 | ! |
cli::cli_progress_done(result = "failed") |
56 |
} |
|
57 | ! |
next |
58 |
} |
|
59 | 2x |
name <- epic$metadata$standard_name |
60 | 2x |
metadata[[name]] <- c(list(epic$metadata), metadata[[name]]) |
61 | 2x |
file_id_cols <- id_cols[id_cols %in% colnames(epic$data)] |
62 | 2x |
epic$data <- epic$data[ |
63 | 2x |
rowMeans(is.na(epic$data[, |
64 | 2x |
!(colnames(epic$data) %in% file_id_cols), |
65 | 2x |
drop = FALSE |
66 |
])) != |
|
67 | 2x |
1, |
68 |
] |
|
69 | 2x |
n_col <- grep("^n_", colnames(epic$data)) |
70 | 2x |
if (length(n_col)) { |
71 | ! |
colnames(epic$data)[[n_col]] <- paste0("n_", epic$metadata$standard_name) |
72 |
} |
|
73 | 2x |
if (!is.null(data[[name]])) { |
74 | 1x |
cols <- colnames(data[[name]]) |
75 | 1x |
cols <- cols[!(cols %in% colnames(epic$data))] |
76 | ! |
if (length(cols)) epic$data[, cols] <- NA |
77 | 1x |
epic$data <- epic$data[, colnames(data[[name]])] |
78 | 1x |
file_id_cols <- id_cols[id_cols %in% colnames(data[[name]])] |
79 | 1x |
data[[name]] <- data[[name]][ |
80 | 1x |
!(do.call(paste, data[[name]][, file_id_cols]) %in% |
81 | 1x |
do.call(paste, epic$data[, file_id_cols])), |
82 |
] |
|
83 |
} |
|
84 | 2x |
data[[name]] <- rbind(epic$data, data[[name]]) |
85 | 2x |
if (verbose) cli::cli_progress_done() |
86 |
} |
|
87 | 1x |
for (name in names(metadata)) { |
88 | 1x |
if (verbose) |
89 | 1x |
cli::cli_progress_step( |
90 | 1x |
"writing standard raw output for {.field {name}}", |
91 | 1x |
spinner = TRUE |
92 |
) |
|
93 | 1x |
paths <- paste0(out_dir, "/", name, ".", c("json", "csv.xz")) |
94 | 1x |
jsonlite::write_json( |
95 | 1x |
metadata[[name]], |
96 | 1x |
paths[[1L]], |
97 | 1x |
auto_unbox = TRUE, |
98 | 1x |
pretty = TRUE |
99 |
) |
|
100 | 1x |
vroom::vroom_write(data[[name]], paths[[2L]]) |
101 | 1x |
if (cleanup) unlink(vapply(metadata[[name]], "[[", "", "file")) |
102 | 1x |
if (verbose) cli::cli_process_done() |
103 |
} |
|
104 | 1x |
return(list(metadata = metadata, data = data)) |
105 |
} |
1 |
#' Read Epic Cosmos Data |
|
2 |
#' |
|
3 |
#' Read in metadata and data from an Epic Cosmos file. |
|
4 |
#' |
|
5 |
#' @param path Path to the file. |
|
6 |
#' @param path_root Directory containing \code{path}, if it is not full. |
|
7 |
#' @returns A list with \code{data.frame} entries for \code{metadata} and \code{data}. |
|
8 |
#' |
|
9 |
#' @examples |
|
10 |
#' # write an example file |
|
11 |
#' path <- tempfile(fileext = ".csv") |
|
12 |
#' raw_lines <- c( |
|
13 |
#' "metadata field,metadata value,", |
|
14 |
#' ",,", |
|
15 |
#' ",Measures,Value Name", |
|
16 |
#' "Year,Measure 1,", |
|
17 |
#' "2020,m1,1", |
|
18 |
#' ",m2,2", |
|
19 |
#' "2021,m1,3", |
|
20 |
#' ",m2,4" |
|
21 |
#' ) |
|
22 |
#' writeLines(raw_lines, path) |
|
23 |
#' |
|
24 |
#' # read it in |
|
25 |
#' pophive_read_epic(basename(path), dirname(path)) |
|
26 |
#' |
|
27 |
#' @export |
|
28 | ||
29 |
pophive_read_epic <- function(path, path_root = ".") { |
|
30 | 2x |
full_path <- if (file.exists(path)) path else |
31 | 2x |
sub("//", "/", paste0(path_root, "/", path), fixed = TRUE) |
32 | 2x |
lines <- readLines(full_path, n = 25L, skipNul = FALSE) |
33 | 2x |
metadata_break <- grep("^[, ]*$", lines) |
34 | 2x |
if (!length(metadata_break)) |
35 | ! |
cli::cli_abort( |
36 | ! |
"path does not appear to point to a file in the Epic format (no metadata separation)" |
37 |
) |
|
38 | 2x |
meta_end <- min(metadata_break) - 1L |
39 | 2x |
data_start <- (if (length(metadata_break) == 1L) metadata_break else |
40 | 2x |
max(metadata_break[ |
41 | 2x |
metadata_break == c(-1L, metadata_break[-1L]) |
42 |
])) + |
|
43 | 2x |
1L |
44 | 2x |
meta <- c( |
45 | 2x |
list( |
46 | 2x |
file = path, |
47 | 2x |
md5 = unname(tools::md5sum(full_path)), |
48 | 2x |
date_processed = Sys.time(), |
49 | 2x |
standard_name = "" |
50 |
), |
|
51 | 2x |
as.list(unlist(lapply( |
52 | 2x |
strsplit(sub(",+$", "", lines[seq_len(meta_end)]), ",", fixed = TRUE), |
53 | 2x |
function(r) { |
54 | 2x |
l <- list(paste(r[-1L], collapse = ",")) |
55 | 2x |
if (l[[1]] == "") { |
56 | ! |
r <- strsplit(r, ": ", fixed = TRUE)[[1L]] |
57 | ! |
l <- list(paste(r[-1L], collapse = ",")) |
58 |
} |
|
59 | 2x |
names(l) <- r[[1L]] |
60 | 2x |
l[[1L]] <- gsub('^"|"$', "", l[[1L]]) |
61 | 2x |
l |
62 |
} |
|
63 |
))) |
|
64 |
) |
|
65 | 2x |
standard_names <- c( |
66 | 2x |
vaccine_mmr = "MMR receipt", |
67 | 2x |
rsv_tests = "RSV tests", |
68 | 2x |
flu = "Influenza", |
69 | 2x |
self_harm = "self-harm", |
70 | 2x |
covid = "COVID", |
71 | 2x |
rsv = "RSV", |
72 | 2x |
obesity = "BMI", |
73 | 2x |
obesity = "obesity", |
74 | 2x |
all_encounters = "All ED Encounters" |
75 |
) |
|
76 | 2x |
meta_string <- paste(unlist(meta), collapse = " ") |
77 | 2x |
for (i in seq_along(standard_names)) { |
78 | 12x |
if (grepl(standard_names[[i]], meta_string, fixed = TRUE)) { |
79 | 2x |
meta$standard_name = names(standard_names)[[i]] |
80 | 2x |
break |
81 |
} |
|
82 |
} |
|
83 | 2x |
id_cols <- seq_len( |
84 | 2x |
length(strsplit(lines[data_start], "^,|Measures,")[[1L]]) - 1L |
85 |
) |
|
86 | 2x |
header <- c( |
87 | 2x |
strsplit(lines[data_start + 1L], ",", fixed = TRUE)[[1L]][id_cols], |
88 | 2x |
strsplit(lines[data_start], ",", fixed = TRUE)[[1L]][-id_cols] |
89 |
) |
|
90 | 2x |
data <- arrow::read_csv_arrow( |
91 | 2x |
full_path, |
92 | 2x |
col_names = header, |
93 | 2x |
col_types = paste(rep("c", length(header)), collapse = ""), |
94 | 2x |
na = c("", "-"), |
95 | 2x |
skip = data_start + 1L |
96 |
) |
|
97 | 2x |
percents <- grep("^(?:Percent|Base|RSV test)", header) |
98 | 2x |
if (length(percents)) { |
99 | ! |
for (col in percents) { |
100 | ! |
data[[col]] <- sub("%", "", data[[col]], fixed = TRUE) |
101 |
} |
|
102 |
} |
|
103 | 2x |
number <- grep("Number", header, fixed = TRUE) |
104 | 2x |
if (length(number)) { |
105 | ! |
for (col in number) { |
106 | ! |
data[[col]][data[[col]] == "10 or fewer"] <- 5L |
107 |
} |
|
108 |
} |
|
109 | 2x |
for (col in id_cols) { |
110 | 10x |
data[[col]] <- vctrs::vec_fill_missing(data[[col]], "down") |
111 |
} |
|
112 | 2x |
if (all(c("Measures", "Base Patient") %in% colnames(data))) { |
113 | ! |
data <- Reduce( |
114 | ! |
merge, |
115 | ! |
lapply(split(data, data$Measures), function(d) { |
116 | ! |
measure <- d$Measures[[1L]] |
117 | ! |
d[[measure]] <- d[["Base Patient"]] |
118 | ! |
d[, !(colnames(d) %in% c("Measures", "Base Patient"))] |
119 |
}) |
|
120 |
) |
|
121 |
} |
|
122 | 2x |
colnames(data) <- standard_columns(colnames(data)) |
123 | 2x |
if (meta$standard_name == "obesity") { |
124 | ! |
meta$standard_name <- paste0( |
125 | ! |
meta$standard_name, |
126 |
"_", |
|
127 | ! |
if ("state" %in% colnames(data)) "state" else "county" |
128 |
) |
|
129 |
} |
|
130 | 2x |
if ("age" %in% colnames(data)) { |
131 | 2x |
std_age <- standard_age(data$age) |
132 | 2x |
missed_ages <- (data$age != "No value") & is.na(std_age) |
133 | 2x |
if (any(missed_ages)) { |
134 | ! |
std_age[missed_ages] <- data$age[missed_ages] |
135 | ! |
missed_levels <- unique(data$age[missed_ages]) |
136 | ! |
cli::cli_warn("missed age levels: {.field {missed_levels}}") |
137 |
} |
|
138 | 2x |
data$age <- std_age |
139 |
} |
|
140 | 2x |
list(metadata = meta, data = data) |
141 |
} |
|
142 | ||
143 |
standard_age <- function(age) { |
|
144 | 2x |
c( |
145 | 2x |
`less than 1 years` = "<1 Years", |
146 | 2x |
`1 and < 2 years` = "1-2 Years", |
147 | 2x |
`2 and < 3 years` = "2-3 Years", |
148 | 2x |
`3 and < 4 years` = "3-4 Years", |
149 | 2x |
`1 and < 5 years` = "1-4 Years", |
150 | 2x |
`1 year or more and less than 5 years` = "1-4 Years", |
151 | 2x |
`4 and < 5 years` = "4-5 Years", |
152 | 2x |
`less than 5 years` = "<5 Years", |
153 | 2x |
`5 and < 6 years` = "5-6 Years", |
154 | 2x |
`5 and < 18 years` = "5-17 Years", |
155 | 2x |
`5 years or more and less than 18 years (1)` = "5-17 Years", |
156 | 2x |
`6 and < 7 years` = "6-7 Years", |
157 | 2x |
`6 years or more` = "6+ Years", |
158 | 2x |
`7 and < 8 years` = "7-8 Years", |
159 | 2x |
`8 and < 9 years` = "8-9 Years", |
160 | 2x |
`9 years or more` = "9+ Years", |
161 | 2x |
`less than 10 years` = "<10 Years", |
162 | 2x |
`10 and < 15 years` = "10-14 Years", |
163 | 2x |
`15 and < 20 years` = "15-19 Years", |
164 | 2x |
`18 and < 40 years` = "18-39 Years", |
165 | 2x |
`18 and < 50 years` = "18-49 Years", |
166 | 2x |
`18 years or more and less than 50 years` = "18-49 Years", |
167 | 2x |
`20 and < 40 years` = "20-39 Years", |
168 | 2x |
`40 and < 65 years` = "40-64 Years", |
169 | 2x |
`50 and < 65 years` = "50-64 Years", |
170 | 2x |
`50 years or more and less than 64 years` = "50-64 Years", |
171 | 2x |
`65 years or more` = "65+ Years", |
172 | 2x |
`65 and < 110 years` = "65+ Years", |
173 | 2x |
`total` = "Total" |
174 |
)[ |
|
175 | 2x |
sub("^[^a-z0-9]+|:.*$", "", tolower(age)) |
176 |
] |
|
177 |
} |
|
178 | ||
179 |
standard_columns <- function(cols) { |
|
180 | 2x |
cols <- gsub(" ", "_", sub("number of ", "n_", tolower(cols)), fixed = TRUE) |
181 | 2x |
cols[grep("^age", cols)] <- "age" |
182 | 2x |
cols[grep("^state", cols)] <- "state" |
183 | 2x |
cols[grep("^county", cols)] <- "county" |
184 | 2x |
cols[grep("bmi_30", cols)] <- "bmi_30_49.8" |
185 | 2x |
cols[grep("hemoglobin_a1c_7", cols)] <- "hemoglobin_a1c_7" |
186 | 2x |
cols[grep("mmr_receipt", cols)] <- "mmr_receipt" |
187 | 2x |
cols[grep("^rsv_tests", cols)] <- "rsv_tests" |
188 | 2x |
cols |
189 |
} |
1 |
#' Run Data Sources |
|
2 |
#' |
|
3 |
#' Optionally run the ingestion script for each data source, and collect metadata. |
|
4 |
#' |
|
5 |
#' @param name Name of a source project to process. Will |
|
6 |
#' @param source_dir Path to the directory containing source projects. |
|
7 |
#' @param ingest Logical; if \code{FALSE}, will re-process standardized data without running |
|
8 |
#' ingestion scripts. |
|
9 |
#' @param is_auto Logical; if \code{TRUE}, will skip process scripts marked as manual. |
|
10 |
#' @param force Logical; if \code{TRUE}, will ignore process frequencies |
|
11 |
#' (will run scripts even if recently run). |
|
12 |
#' @returns A list with processing results: |
|
13 |
#' \itemize{ |
|
14 |
#' \item \code{timings}: How many seconds the ingestion script took to run. |
|
15 |
#' \item \code{logs}: The captured output of the ingestion script. |
|
16 |
#' } |
|
17 |
#' Each entry has an entry for each source. |
|
18 |
#' |
|
19 |
#' A `datapackage.json` file is also created / update in each source's `standard` directory. |
|
20 |
#' @examples |
|
21 |
#' \dontrun{ |
|
22 |
#' # run from a directory containing a `data` directory containing the source |
|
23 |
#' pophive_process("source_name") |
|
24 |
#' |
|
25 |
#' # run without executing the ingestion script |
|
26 |
#' pophive_process("source_name", ingest = FALSE) |
|
27 |
#' } |
|
28 |
#' @export |
|
29 | ||
30 |
pophive_process <- function( |
|
31 |
name = NULL, |
|
32 |
source_dir = "data", |
|
33 |
ingest = TRUE, |
|
34 |
is_auto = FALSE, |
|
35 |
force = FALSE |
|
36 |
) { |
|
37 | 3x |
sources <- if (is.null(name)) { |
38 | ! |
list.files( |
39 | ! |
source_dir, |
40 | ! |
"process\\.json", |
41 | ! |
recursive = TRUE, |
42 | ! |
full.names = TRUE |
43 |
) |
|
44 |
} else { |
|
45 | 3x |
process_files <- paste0(source_dir, "/", name, "/process.json") |
46 | 3x |
if (any(!file.exists(process_files))) { |
47 | ! |
cli::cli_abort( |
48 | ! |
"missing process file{?/s}: {.emph {process_files[!file.exists(process_files)]}}" |
49 |
) |
|
50 |
} |
|
51 | 3x |
process_files |
52 |
} |
|
53 | 3x |
decide_to_run <- function(process_script) { |
54 | ! |
if (is_auto && process_script$manual) return(FALSE) |
55 | 3x |
if ( |
56 | 2x |
force || process_script$last_run == "" || process_script$frequency == 0L |
57 |
) |
|
58 | 2x |
return(TRUE) |
59 | 3x |
if ( |
60 | ! |
difftime(Sys.time(), as.POSIXct(process_script$last_run), units = "day") > |
61 | ! |
process_script$frequency |
62 |
) { |
|
63 | ! |
return(TRUE) |
64 |
} |
|
65 | ! |
FALSE |
66 |
} |
|
67 | 3x |
timings <- list() |
68 | 3x |
logs <- list() |
69 | 3x |
for (process_file in sources) { |
70 | 3x |
pophive_add_source(basename(dirname(process_file)), source_dir, FALSE) |
71 | 3x |
process_def <- pophive_source_process(process_file) |
72 | 3x |
name <- process_def$name |
73 | 3x |
for (si in seq_along(process_def$scripts)) { |
74 | 3x |
process_script <- process_def$scripts[[si]] |
75 | 3x |
run_current <- ingest && decide_to_run(process_script) |
76 | 3x |
base_dir <- dirname(process_file) |
77 | 3x |
script <- paste0(base_dir, "/", process_script$path) |
78 | 3x |
st <- proc.time()[[3]] |
79 | 3x |
file_ref <- if (run_current) paste0(" ({.file ", script, "})") else NULL |
80 | 3x |
cli::cli_progress_step( |
81 | 3x |
paste0("processing {.strong ", name, "}", file_ref), |
82 | 3x |
msg_failed = paste0("failed to process {.strong ", name, "}", file_ref), |
83 | 3x |
spinner = TRUE |
84 |
) |
|
85 | 3x |
env <- new.env() |
86 | 3x |
env$pophive_process_continue <- TRUE |
87 | 3x |
status <- if (ingest) { |
88 | 2x |
tryCatch( |
89 | 2x |
list( |
90 | 2x |
log = utils::capture.output( |
91 | 2x |
source(script, env, chdir = TRUE), |
92 | 2x |
type = "message" |
93 |
), |
|
94 | 2x |
success = TRUE |
95 |
), |
|
96 | 2x |
error = function(e) list(log = e$message, success = FALSE) |
97 |
) |
|
98 | 3x |
} else list(log = "", success = TRUE) |
99 | 3x |
logs[[name]] <- status$log |
100 | 3x |
if (run_current) { |
101 | 2x |
process_script$last_run <- Sys.time() |
102 | 2x |
process_script$run_time <- proc.time()[[3]] - st |
103 | 2x |
process_script$last_status <- status |
104 | 2x |
process_def$scripts[[si]] <- process_script |
105 |
} |
|
106 | 3x |
if (status$success) timings[[name]] <- process_script$run_time |
107 | ! |
if (!env$pophive_process_continue) break |
108 |
} |
|
109 | 3x |
process_def_current <- pophive_source_process(process_file) |
110 | 3x |
process_def_current$scripts <- process_def$scripts |
111 | 3x |
pophive_source_process(process_file, process_def_current) |
112 | 3x |
data_files <- list.files( |
113 | 3x |
paste0(base_dir, "/standard"), |
114 | 3x |
"\\.(?:csv|parquet)" |
115 |
) |
|
116 | 3x |
if (length(data_files)) { |
117 | 3x |
measure_info_file <- paste0(base_dir, "/measure_info.json") |
118 | 3x |
standard_state <- as.list(tools::md5sum(c( |
119 | 3x |
measure_info_file, |
120 | 3x |
paste0(base_dir, "/standard/", data_files) |
121 |
))) |
|
122 | 3x |
if (!identical(process_def_current$standard_state, standard_state)) { |
123 | 3x |
measure_info <- community::data_measure_info( |
124 | 3x |
measure_info_file, |
125 | 3x |
include_empty = FALSE, |
126 | 3x |
render = TRUE, |
127 | 3x |
write = FALSE, |
128 | 3x |
open_after = FALSE, |
129 | 3x |
verbose = FALSE |
130 |
) |
|
131 | 3x |
measure_sources <- list() |
132 | 3x |
for (info in measure_info) { |
133 | 4x |
for (s in info$sources) { |
134 |
if ( |
|
135 | ! |
!is.null(s$location) && |
136 | ! |
!(s$location %in% names(sources)) |
137 |
) { |
|
138 | ! |
measure_sources[[s$location]] <- s |
139 |
} |
|
140 |
} |
|
141 |
} |
|
142 | 3x |
community::data_add( |
143 | 3x |
data_files, |
144 | 3x |
meta = list( |
145 | 3x |
source = unname(measure_sources), |
146 | 3x |
ids = "geography", |
147 | 3x |
time = "time", |
148 | 3x |
variables = measure_info |
149 |
), |
|
150 | 3x |
dir = paste0(base_dir, "/standard"), |
151 | 3x |
pretty = TRUE, |
152 | 3x |
summarize_ids = TRUE, |
153 | 3x |
verbose = FALSE |
154 |
) |
|
155 | 3x |
process_def_current$standard_state <- standard_state |
156 |
} |
|
157 | 3x |
pophive_source_process(process_file, process_def_current) |
158 | 3x |
cli::cli_progress_done(result = if (status$success) "done" else "failed") |
159 |
} else { |
|
160 | ! |
cli::cli_progress_update( |
161 | ! |
"no standard data files found in {.path {process_file}}" |
162 |
) |
|
163 | ! |
cli::cli_progress_done(result = "failed") |
164 |
} |
|
165 |
} |
|
166 | 3x |
invisible(list(timings = timings, logs = logs)) |
167 |
} |
1 |
#' Retrieve A Data File |
|
2 |
#' |
|
3 |
#' Load a data file from a source data project, or list versions of the file. |
|
4 |
#' |
|
5 |
#' @param path Path to the file. |
|
6 |
#' @param date Date of the version to load; A \code{Date}, or \code{character} in the format |
|
7 |
#' \code{YYYY-MM-DD}. Will match to the nearest version. |
|
8 |
#' @param commit_hash SHA signature of the committed version; |
|
9 |
#' can be the first 6 or so characters. Ignored if \code{date} is provided. |
|
10 |
#' @param versions Logical; if \code{TRUE}, will return a list of available version, |
|
11 |
#' rather than a |
|
12 |
#' @returns If \code{versions} is \code{TRUE}, a \code{data.frame} with columns for |
|
13 |
#' the \code{hash}, \code{author}, \code{date}, and \code{message} of each commit. |
|
14 |
#' Otherwise, the path to a temporary file, if one was extracted. |
|
15 |
#' |
|
16 |
#' @examples |
|
17 |
#' path <- "../../data/wastewater/raw/flua.csv.xz" |
|
18 |
#' if (file.exists(path)) { |
|
19 |
#' # list versions |
|
20 |
#' versions <- pophive_get_file(path, versions = TRUE) |
|
21 |
#' print(versions[, c("date", "hash")]) |
|
22 |
#' |
|
23 |
#' # extract a version to a temporary file |
|
24 |
#' temp_path <- pophive_get_file(path, "2025-05") |
|
25 |
#' basename(temp_path) |
|
26 |
#' } |
|
27 |
#' |
|
28 |
#' @export |
|
29 | ||
30 |
pophive_get_file <- function( |
|
31 |
path, |
|
32 |
date = NULL, |
|
33 |
commit_hash = NULL, |
|
34 |
versions = FALSE |
|
35 |
) { |
|
36 | ! |
if (missing(path)) cli::cli_abort("specify a path") |
37 | ! |
if (!file.exists(path)) cli::cli_abort("path does not exist") |
38 | 3x |
vs <- data.frame( |
39 | 3x |
hash = character(), |
40 | 3x |
author = character(), |
41 | 3x |
date = character(), |
42 | 3x |
message = character() |
43 |
) |
|
44 | 3x |
if (versions || !is.null(date)) { |
45 | 2x |
commits <- sys::exec_internal("git", c("log", path)) |
46 | 2x |
if (commits$status == 0L) { |
47 | 2x |
commits <- do.call( |
48 | 2x |
rbind, |
49 | 2x |
Filter( |
50 | 2x |
function(e) length(e) == 4L, |
51 | 2x |
strsplit( |
52 | 2x |
strsplit(rawToChar(commits$stdout), "commit ", fixed = TRUE)[[1L]], |
53 | 2x |
"\\n+(?:[^:]+:)?\\s*" |
54 |
) |
|
55 |
) |
|
56 |
) |
|
57 | 2x |
colnames(commits) <- colnames(vs) |
58 | 2x |
vs <- as.data.frame(commits) |
59 |
} else { |
|
60 | ! |
cli::cli_abort("failed to git log: {rawToChar(commits$stderr)}") |
61 |
} |
|
62 |
} |
|
63 | 1x |
if (versions) return(vs) |
64 | 2x |
if (!is.null(date)) { |
65 | ! |
if (nrow(vs) == 0L) return(path) |
66 | 1x |
if (is.character(date)) |
67 | 1x |
date <- as.POSIXct( |
68 | 1x |
date, |
69 | 1x |
tryFormats = c( |
70 | 1x |
"%Y-%m-%d %H:%M:%S", |
71 | 1x |
"%Y-%m-%d %H:%M", |
72 | 1x |
"%Y-%m-%d", |
73 | 1x |
"%Y-%m", |
74 | 1x |
"%Y" |
75 |
), |
|
76 | 1x |
tz = "UTC" |
77 |
) |
|
78 | 1x |
commit_hash <- vs$hash[which.min(abs( |
79 | 1x |
as.POSIXct(vs$date, "%a %b %d %H:%M:%S %Y", tz = "UTC") - date |
80 |
))] |
|
81 |
} |
|
82 | ! |
if (is.null(commit_hash)) return(path) |
83 | 2x |
name_parts <- strsplit(basename(path), ".", fixed = TRUE)[[1L]] |
84 | 2x |
out_path <- paste0( |
85 | 2x |
tempdir(), |
86 |
"/", |
|
87 | 2x |
name_parts[[1L]], |
88 |
"-", |
|
89 | 2x |
substring(commit_hash, 1L, 6L), |
90 |
".", |
|
91 | 2x |
paste(name_parts[-1L], collapse = ".") |
92 |
) |
|
93 | ! |
if (file.exists(out_path)) return(out_path) |
94 | 2x |
status <- sys::exec_wait( |
95 | 2x |
"git", |
96 | 2x |
c("show", paste0(commit_hash, ":", path)), |
97 | 2x |
std_out = out_path |
98 |
) |
|
99 | 2x |
if (status != 0L) |
100 | ! |
cli::cli_abort("failed to git show: {rawToChar(status$stderr)}") |
101 | 2x |
out_path |
102 |
} |
1 |
#' Adds a source project structure |
|
2 |
#' |
|
3 |
#' Establishes a new data source project, used to collect and prepare data from a new source. |
|
4 |
#' |
|
5 |
#' @param name Name of the source. |
|
6 |
#' @param base_dir Path to the directory containing sources. |
|
7 |
#' @param open_after Logical; if \code{FALSE}, will not open the project. |
|
8 |
#' @returns Nothing; creates default files and directories. |
|
9 |
#' @section Project: |
|
10 |
#' |
|
11 |
#' Within a source project, there are two files to edits: |
|
12 |
#' \itemize{ |
|
13 |
#' \item \strong{\code{ingest.R}}: This is the primary script, which is automatically rerun. |
|
14 |
#' It should store raw data and resources in \code{raw/} where possible, |
|
15 |
#' then use what's in \code{raw/} to produce standard-format files in \code{standard/}. |
|
16 |
#' This file is sourced from its location during processing, so any system paths |
|
17 |
#' must be relative to itself. |
|
18 |
#' \item \strong{\code{measure_info.json}}: This is where you can record information |
|
19 |
#' about the variables included in the standardized data files. |
|
20 |
#' See \code{\link[community]{data_measure_info}}. |
|
21 |
#' } |
|
22 |
#' |
|
23 |
#' @examples |
|
24 |
#' data_source_dir <- tempdir() |
|
25 |
#' pophive_add_source("source_name", data_source_dir) |
|
26 |
#' list.files(paste0(data_source_dir, "/source_name")) |
|
27 |
#' |
|
28 |
#' @export |
|
29 | ||
30 |
pophive_add_source <- function( |
|
31 |
name, |
|
32 |
base_dir = "data", |
|
33 |
open_after = interactive() |
|
34 |
) { |
|
35 | ! |
if (missing(name)) cli::cli_abort("specify a name") |
36 | 6x |
name <- gsub("[^A-Za-z0-9]+", "_", name) |
37 | 6x |
base_path <- paste0(base_dir, "/", name, "/") |
38 | 6x |
dir.create(base_path, showWarnings = FALSE, recursive = TRUE) |
39 | 6x |
dir.create(paste0(base_path, "raw"), showWarnings = FALSE, recursive = TRUE) |
40 | 6x |
dir.create( |
41 | 6x |
paste0(base_path, "standard"), |
42 | 6x |
showWarnings = FALSE, |
43 | 6x |
recursive = TRUE |
44 |
) |
|
45 | 6x |
paths <- paste0( |
46 | 6x |
base_path, |
47 | 6x |
c( |
48 | 6x |
"measure_info.json", |
49 | 6x |
"ingest.R", |
50 | 6x |
"project.Rproj", |
51 | 6x |
"standard/datapackage.json", |
52 | 6x |
"process.json", |
53 | 6x |
"README.md" |
54 |
) |
|
55 |
) |
|
56 | 6x |
if (!file.exists(paths[[1]])) { |
57 | 1x |
community::data_measure_info( |
58 | 1x |
paths[[1]], |
59 | 1x |
example_variable = list(), |
60 | 1x |
verbose = FALSE, |
61 | 1x |
open_after = FALSE |
62 |
) |
|
63 |
} |
|
64 | 6x |
if (!file.exists(paths[[2]])) { |
65 | 1x |
writeLines( |
66 | 1x |
paste0( |
67 | 1x |
c( |
68 |
"#", |
|
69 | 1x |
"# Download", |
70 |
"#", |
|
71 |
"", |
|
72 | 1x |
"# add files to the `raw` directory", |
73 |
"", |
|
74 |
"#", |
|
75 | 1x |
"# Reformat", |
76 |
"#", |
|
77 |
"", |
|
78 | 1x |
"# read from the `raw` directory, and write to the `standard` directory", |
79 |
"" |
|
80 |
), |
|
81 | 1x |
collapse = "\n" |
82 |
), |
|
83 | 1x |
paths[[2]] |
84 |
) |
|
85 |
} |
|
86 | 6x |
if (!file.exists(paths[[3]])) { |
87 | 1x |
writeLines("Version: 1.0\n", paths[[3]]) |
88 |
} |
|
89 | 6x |
if (!file.exists(paths[[4]])) |
90 | 1x |
community::init_data( |
91 | 1x |
name, |
92 | 1x |
dir = paste0(base_path, "standard"), |
93 | 1x |
quiet = TRUE |
94 |
) |
|
95 | ||
96 | 6x |
if (!file.exists(paths[[5]])) |
97 | 1x |
jsonlite::write_json( |
98 | 1x |
list( |
99 | 1x |
name = name, |
100 | 1x |
scripts = list( |
101 | 1x |
list( |
102 | 1x |
path = "ingest.R", |
103 | 1x |
manual = FALSE, |
104 | 1x |
frequency = 0L, |
105 | 1x |
last_run = "", |
106 | 1x |
run_time = "", |
107 | 1x |
last_status = list(log = "", success = TRUE) |
108 |
) |
|
109 |
), |
|
110 | 1x |
checked = "", |
111 | 1x |
check_results = list() |
112 |
), |
|
113 | 1x |
paths[[5]], |
114 | 1x |
auto_unbox = TRUE, |
115 | 1x |
pretty = TRUE |
116 |
) |
|
117 | 6x |
if (!file.exists(paths[[6]])) { |
118 | 1x |
writeLines( |
119 | 1x |
paste0( |
120 | 1x |
c( |
121 | 1x |
paste("#", name), |
122 |
"", |
|
123 | 1x |
"This is a PopHIVE data source project, initialized with `pophive::pophive_add_source`.", |
124 |
"", |
|
125 | 1x |
"You can us the `pophive` package to check the project:", |
126 |
"", |
|
127 | 1x |
"```R", |
128 | 1x |
paste0('pophive_check_source("', name, '", "..")'), |
129 |
"```", |
|
130 |
"", |
|
131 | 1x |
"And process it:", |
132 |
"", |
|
133 | 1x |
"```R", |
134 | 1x |
paste0('pophive_process("', name, '", "..")'), |
135 |
"```" |
|
136 |
), |
|
137 | 1x |
collapse = "\n" |
138 |
), |
|
139 | 1x |
paths[[6]] |
140 |
) |
|
141 |
} |
|
142 | ! |
if (open_after) rstudioapi::openProject(paths[[3]], newSession = TRUE) |
143 |
} |
1 |
#' Standardize Epic Data |
|
2 |
#' |
|
3 |
#' Standardize a raw Epic data table. |
|
4 |
#' |
|
5 |
#' @param raw_data Raw Epic data, such as returned from \link{pophive_read_epic}. |
|
6 |
#' @returns A standardized form of \code{data}. |
|
7 |
#' @section Standardization: |
|
8 |
#' \itemize{ |
|
9 |
#' \item Collapse location columns (\code{state} or \code{county}) to a single |
|
10 |
#' \code{geography} column, and region names to IDs. |
|
11 |
#' \item Collapse time columns (\code{year}, \code{month}, or \code{week}) to a single |
|
12 |
#' \code{time} column, and clean up value formatting. |
|
13 |
#' \item Drop rows with no values across value columns. |
|
14 |
#' } |
|
15 |
#' @examples |
|
16 |
#' \dontrun{ |
|
17 |
#' raw_data <- pophive_read_epic("data/epic/raw/flu.csv.xz") |
|
18 |
#' standard_data <- pophive_process_epic_raw(raw_data) |
|
19 |
#' } |
|
20 |
#' |
|
21 |
#' @export |
|
22 | ||
23 |
pophive_standardize_epic <- function(raw_data) { |
|
24 | 1x |
region_names <- epic_id_maps$regions |
25 | 1x |
names(region_names) <- gsub( |
26 | 1x |
" (?:CITY AND BOROUGH|BOROUGH|PARISH|MUNICIPALITY|MUNICIPIO)|[.']", |
27 |
"", |
|
28 | 1x |
toupper(names(region_names)) |
29 |
) |
|
30 | 1x |
cols <- colnames(raw_data) |
31 | 1x |
time_col <- which(cols == "year") |
32 | 1x |
if (length(time_col)) { |
33 | 1x |
colnames(raw_data)[time_col] <- "time" |
34 | 1x |
raw_data$time <- as.integer(substring( |
35 | 1x |
raw_data$time, |
36 | 1x |
nchar(raw_data$time) - 4L |
37 |
)) |
|
38 |
} |
|
39 | 1x |
month_col <- which(cols == "month") |
40 | 1x |
if (length(month_col)) { |
41 | ! |
raw_data$time <- paste0( |
42 | ! |
raw_data$time, |
43 |
"-", |
|
44 | ! |
epic_id_maps$months[raw_data$month] |
45 |
) |
|
46 |
} |
|
47 | 1x |
week_col <- which(cols == "week") |
48 | 1x |
if (length(week_col)) { |
49 | 1x |
raw_data$time <- paste0( |
50 | 1x |
raw_data$time, |
51 |
"-", |
|
52 | 1x |
vapply( |
53 | 1x |
strsplit(raw_data$week, "[^A-Za-z0-9]"), |
54 | 1x |
function(p) { |
55 | 4x |
paste0( |
56 | 4x |
epic_id_maps$months[[p[[1L]]]], |
57 |
"-", |
|
58 | 4x |
formatC(as.integer(p[[2L]]), width = 2L, flag = "0") |
59 |
) |
|
60 |
}, |
|
61 |
"" |
|
62 |
) |
|
63 |
) |
|
64 |
} |
|
65 | 1x |
geo_col <- grep("^(?:state|county)", cols) |
66 | 1x |
if (length(geo_col)) { |
67 | 1x |
colnames(raw_data)[geo_col] <- "geography" |
68 | 1x |
raw_data$geography <- toupper(raw_data$geography) |
69 | 1x |
missing_geo <- !(raw_data$geography %in% names(region_names)) |
70 | 1x |
if (any(missing_geo)) { |
71 | 1x |
geo <- sub( |
72 | 1x |
"LA ", |
73 | 1x |
"LA", |
74 | 1x |
sub("^SAINT", "ST", raw_data$geography[missing_geo]), |
75 | 1x |
fixed = TRUE |
76 |
) |
|
77 | 1x |
if (any(grepl(", VA", geo, fixed = TRUE))) { |
78 | 1x |
geo[geo == "SALEM, VA"] <- "SALEM CITY, VA" |
79 | 1x |
geo[geo == "RADFORD, VA"] <- "RADFORD CITY, VA" |
80 | 1x |
geo[geo == "DONA ANA, NM"] <- "DO\u00d1A ANA, NM" |
81 | 1x |
geo[geo == "MATANUSKA SUSITNA, AK"] <- "MATANUSKA-SUSITNA, AK" |
82 |
} |
|
83 | 1x |
raw_data$geography[missing_geo] <- geo |
84 |
} |
|
85 | 1x |
missing_regions <- raw_data$geography[ |
86 | 1x |
!(raw_data$geography %in% names(region_names)) |
87 |
] |
|
88 | 1x |
if (length(missing_regions)) { |
89 | ! |
cli::cli_warn( |
90 | ! |
'unrecognized regions: {paste(unique(missing_regions), collapse = "; ")}' |
91 |
) |
|
92 |
} |
|
93 | 1x |
raw_data$geography <- region_names[raw_data$geography] |
94 | 1x |
raw_data <- raw_data[!is.na(raw_data$geography), ] |
95 |
} |
|
96 | 1x |
raw_data <- raw_data[, |
97 | 1x |
!(colnames(raw_data) %in% c("state", "county", "year", "month", "week")) |
98 |
] |
|
99 | 1x |
raw_data[ |
100 | 1x |
rowSums( |
101 | 1x |
!is.na(raw_data[, |
102 | 1x |
!(colnames(raw_data) %in% c("geography", "time", "age")) |
103 |
]) |
|
104 |
) != |
|
105 | 1x |
0L, |
106 |
] |
|
107 |
} |
1 |
#' Interact with a Source Process File |
|
2 |
#' |
|
3 |
#' Read or update the current source process file. |
|
4 |
#' |
|
5 |
#' @param path Path to the process JSON file. |
|
6 |
#' @param updated An update version of the process definition. If specified, will |
|
7 |
#' write this as the new process file, rather than reading any existing file. |
|
8 |
#' @returns The process definition of the source project. |
|
9 |
#' @examples |
|
10 |
#' epic_process_file <- "../../epic/process.json" |
|
11 |
#' if (file.exists(epic_process_file)) { |
|
12 |
#' pophive_source_process(path = epic_process_file) |
|
13 |
#' } |
|
14 |
#' @export |
|
15 | ||
16 |
pophive_source_process <- function(path = "process.json", updated = NULL) { |
|
17 | 12x |
if (is.null(updated)) { |
18 | ! |
if (!file.exists(path)) cli::cli_abort("process file {path} does not exist") |
19 | 6x |
jsonlite::read_json(path) |
20 |
} else { |
|
21 | 6x |
jsonlite::write_json(updated, path, auto_unbox = TRUE, pretty = TRUE) |
22 | 6x |
updated |
23 |
} |
|
24 |
} |