Module factfinder.main

Expand source code
import asyncio
import itertools
import json
import logging
import os
from functools import lru_cache, partial
from multiprocessing import Pool
from pathlib import Path

import numpy as np
import pandas as pd
from cached_property import cached_property
from census import Census

from .aggregated_geography import AggregatedGeography
from .median import get_median, get_median_moe
from .special import special_variable_options
from .utils import get_c, get_p, get_z, outliers
from .variable import Variable


class Pff:
    def __init__(self, api_key, year=2018, source="acs"):
        self.c = Census(api_key)
        self.year = year
        self.source = source
        self.state = 36
        self.counties = ["005", "081", "085", "047", "061"]

        self.client_options = {
            "D": self.c.acs5dp,
            "S": self.c.acs5st,
            "P": self.c.sf1,
            "B": self.c.acs5,
        }

        self.agg_geo = AggregatedGeography(year)
        self.aggregate_vertical_options = self.agg_geo.aggregate_vertical_options
        self.special_variable_options = special_variable_options
        self.outliers = outliers

        # Contains variables where the numerator comes from a DP dataset, but pff uses a different base than the census
        self.profile_only_exceptions = [
            "abroad",
            "cvlfuem2",
            "dfhsdfcnt",
            "dfhssmcnt",
            "dfhsus",
            "hh5",
            "oochu4",
            "p65plbwpv",
            "pbwpv",
            "pu18bwpv",
        ]

    @cached_property
    def metadata(self) -> list:
        with open(
            f"{Path(__file__).parent}/data/{self.source}/{self.year}/metadata.json"
        ) as f:
            return json.load(f)

    @cached_property
    def median(self) -> list:
        with open(
            f"{Path(__file__).parent}/data/{self.source}/{self.year}/median.json"
        ) as f:
            return json.load(f)

    @cached_property
    def special(self) -> list:
        with open(
            f"{Path(__file__).parent}/data/{self.source}/{self.year}/special.json"
        ) as f:
            return json.load(f)

    @cached_property
    def aggregated_geography(self) -> list:
        list3d = [
            [list(k.keys()) for k in i.values()]
            for i in self.aggregate_vertical_options.values()
        ]
        list2d = itertools.chain.from_iterable(list3d)
        return list(set(itertools.chain.from_iterable(list2d)))

    @cached_property
    def profile_only_variables(self) -> list:
        return [
            i["pff_variable"]
            for i in self.metadata
            if (
                i["census_variable"][0][0:2] == "DP"
                and len(i["census_variable"]) == 1
                and i["pff_variable"] not in self.profile_only_exceptions
            )
        ]

    @cached_property
    def base_variables(self) -> list:
        """
        returns a list of base variables in the format of pff_variable
        """
        return list(set([i["base_variable"] for i in self.metadata]))

    @cached_property
    def median_variables(self) -> list:
        """
        returns a list of median variables in the format of pff_variable
        """
        return list(self.median.keys())

    @cached_property
    def special_variables(self) -> list:
        """
        returns a list of special calculation variables in the format
        of pff_variable
        """
        return [i["pff_variable"] for i in self.special]

    def get_special_base_variables(self, pff_variable) -> list:
        """
        returns a list of special calculation base variables in the format
        of pff_variable
        """
        special = list(
            filter(lambda x: x["pff_variable"] == pff_variable, self.special)
        )
        return special[0]["base_variables"]

    def median_ranges(self, pff_variable) -> dict:
        """
        given median variable in the format of pff_variable
        returns the ranges object for the median variable.
        e.g.
        {
            'mdpop0t4': [0, 4.9999],
            'mdpop5t9': [5, 9.9999],
            ...
        }
        """
        return self.median[pff_variable]["ranges"]

    def median_design_factor(self, pff_variable) -> float:
        """
        given median variable in the form of pff_variable
        returns the design_factor needed to calculate the
        median moe
        """
        return self.median[pff_variable]["design_factor"]

    def calculate(self, pff_variable: str, geotype: str) -> pd.DataFrame:
        """
        Main user interface, going through different following steps:
        1. calculate_c_e_m_p_z
        2. rounding
        3. assigning geoid
        """
        # 0. Initialize Variable class instance
        v = self.create_variable(pff_variable)
        # 1. get calculated values (c,e,m,p,z)
        df = self.calculate_c_e_m_p_z(v, geotype)
        # 2. rounding
        df = self.rounding(df, v.rounding)
        # 3. last round of data cleaning
        df = self.cleaning(df, v, geotype)
        return df

    def rounding(self, df: pd.DataFrame, digits: int) -> pd.DataFrame:
        """
        Round c, e, m, p, z fields based on rounding digits from metadata
        """
        df["c"] = df["c"].round(1)
        df["e"] = df["e"].round(digits)
        df["m"] = df["m"].round(digits)
        df["p"] = df["p"].round(1)
        df["z"] = df["z"].round(1)
        return df

    def cleaning(self, df: pd.DataFrame, v: Variable, geotype: str) -> pd.DataFrame:
        """
        last step data cleaning based on rules below:
        """
        # negative values are invalid
        df.loc[df.c < 0, "c"] = np.nan
        df.loc[df.e < 0, "e"] = np.nan
        df.loc[df.m < 0, "m"] = np.nan
        df.loc[df.p < 0, "p"] = np.nan
        df.loc[df.z < 0, "z"] = np.nan

        # p has to be less or equal to 100
        df.loc[df.p > 100, "p"] = np.nan

        # If p = np.nan/, then z = np.nan
        df.loc[(df.p.isna()) | (df.p == 100), "z"] = np.nan

        df.loc[df.e == 0, "c"] = np.nan
        df.loc[df.e == 0 & ~df.pff_variable.isin(self.base_variables), "m"] = np.nan
        df.loc[
            df.e == 0 & df.pff_variable.isin(self.base_variables) & df.m.isna(), "m"
        ] = 0
        df.loc[df.e == 0, "p"] = np.nan
        df.loc[df.e == 0, "z"] = np.nan

        df.loc[
            df.geotype.isin(["borough", "city"])
            & df.pff_variable.isin(self.base_variables)
            & df.c.isna(),
            "c",
        ] = 0

        df.loc[
            df.geotype.isin(["borough", "city"])
            & df.pff_variable.isin(self.base_variables)
            & df.m.isna(),
            "m",
        ] = 0

        df.loc[df.pff_variable.isin(self.base_variables), "p"] = 100

        df.loc[df.pff_variable.isin(self.base_variables), "z"] = np.nan

        return df

    def calculate_multiple_e_m(self, pff_variables: list, geotype: str) -> pd.DataFrame:
        """
        given a list of pff_variables, and geotype, calculate multiple
        variables e, m at the same time using multiprocessing
        """
        _calculate_e_m = partial(self.calculate_e_m, geotype=geotype)
        with Pool(5) as download_pool:
            dfs = download_pool.map(_calculate_e_m, pff_variables)
        df = pd.concat(dfs)
        del dfs
        return df

    def calculate_special_e_m(self, pff_variable: str, geotype: str) -> pd.DataFrame:
        """
        Given pff_variable and geotype, download and calculate the variable.
        Used for variables requiring special horizontal aggregation techniques.
        """
        base_variables = self.get_special_base_variables(pff_variable)
        df = self.calculate_multiple_e_m(base_variables, geotype)
        df = self.special_variable_options[pff_variable](df, base_variables)
        df["pff_variable"] = pff_variable
        df["geotype"] = geotype
        return df[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def calculate_median_e_m(self, pff_variable, geotype) -> pd.DataFrame:
        """
        Given median variable in the form of pff_variable and geotype
        calculate the median and median moe
        """
        # 1. Initialize
        ranges = self.median_ranges(pff_variable)
        design_factor = self.median_design_factor(pff_variable)
        rounding = self.create_variable(pff_variable).rounding

        # 2. Calculate each variable that goes into median calculation
        df = self.calculate_multiple_e_m(list(ranges.keys()), geotype)

        # 3. create a pivot table with census_geoid as the index, and
        # pff_variable as column names. df_pivoted.e -> the estimation dataframe
        df_pivoted = df.loc[:, ["census_geoid", "pff_variable", "e"]].pivot(
            index="census_geoid", columns="pff_variable", values=["e"]
        )

        # Empty dataframe to store the results
        results = pd.DataFrame()
        results["census_geoid"] = df_pivoted.index
        results["pff_variable"] = pff_variable
        results["geotype"] = geotype

        # 4. calculate median estimation using get_median
        results["e"] = (
            df_pivoted.e.loc[
                df_pivoted.e.index == results.census_geoid, list(ranges.keys())
            ]
            .apply(lambda row: get_median(ranges, row), axis=1)
            .to_list()
        )

        # 5. Calculate median moe using get_median_moe
        # Note that median moe calculation needs the median estimation
        # so we seperated df_pivoted.m out as a seperate dataframe
        e = df_pivoted.e.copy()
        e["e"] = results.loc[e.index == results.census_geoid, "e"].to_list()
        results["m"] = (
            e.loc[e.index == results.census_geoid, list(ranges.keys()) + ["e"]]
            .apply(lambda row: get_median_moe(ranges, row, design_factor), axis=1)
            .to_list()
        )

        # 6. return the output, containing the median, and all the variables used
        return results

    def create_variable(self, pff_variable: str) -> Variable:
        """
        given pff_variable name, return a Variable object
        """
        meta = list(filter(lambda x: x["pff_variable"] == pff_variable, self.metadata))[
            0
        ]
        return Variable(meta)

    @lru_cache(maxsize=1048)
    def get_aggregate_vertical(self, source: str, geotype: str):
        """
        this function will aggregate over geographies,
        e.g. aggregate over tracts to get NTA level data
        """
        source = "acs" if source != "decennial" else source
        to_geotype = geotype
        if geotype not in self.aggregated_geography:
            aggregate_vertical = lambda df: df
            from_geotype = geotype
        else:
            options = self.aggregate_vertical_options.get(source)
            for k, v in options.items():
                if geotype in v.keys():
                    from_geotype = k
                    aggregate_vertical = options[k][geotype]
        return from_geotype, aggregate_vertical

    @lru_cache(maxsize=1048)
    def calculate_e_m(self, pff_variable: str, geotype: str) -> pd.DataFrame:
        """
        Given pff_variable and geotype, download and calculate the variable
        """
        cache_path = f".cache/{geotype}/{pff_variable}.pkl"
        if os.path.isfile(cache_path):
            df = pd.read_pickle(cache_path)
        else:
            # 1. create variable
            v = self.create_variable(pff_variable)

            # 2. pulling data from census site and aggregating
            from_geotype, aggregate_vertical = self.get_aggregate_vertical(
                v.source, geotype
            )
            df = self.aggregate_horizontal(v, from_geotype)
            df = aggregate_vertical(df)
            os.makedirs(f".cache/{geotype}", exist_ok=True)
            self.write_to_cache(df, cache_path)
        return df

    def write_to_cache(self, df: pd.DataFrame, path: str):
        if not os.path.isfile(path):
            df.to_pickle(path)
        return None

    def calculate_e_m_p_z(self, pff_variable: str, geotype: str) -> pd.DataFrame:
        """
        This function is used for calculating profile variables only with
        non-aggregated-geography geotype
        """
        # 1. create variable
        v = self.create_variable(pff_variable)
        # hard coding because by definition profile-only
        #  variables only has 1 census variable
        census_variable = v.census_variable[0]
        # 2. pulling data from census site and aggregating
        df = asyncio.run(self.download_variable(self.download_e_m_p_z, v, geotype))
        df["pff_variable"] = pff_variable
        df["geotype"] = geotype
        # 3. Change field names
        columns = {
            census_variable + "E": "e",
            census_variable + "M": "m",
            census_variable + "PE": "p",
            census_variable + "PM": "z",
        }
        df = self.create_census_geoid(df, geotype)
        df = df.rename(columns=columns)
        return df[["census_geoid", "pff_variable", "geotype", "e", "m", "p", "z"]]

    def calculate_poverty_p_z(self, v: Variable, geotype: str) -> pd.DataFrame:
        """
        For below poverty vars, the percent and percent MOE are taken from the ACS,
        but they come from E and M fields, not PE and PM fields. This function
        calculates the E and M for the associated percent variable, then renames as
        P and Z to join with the count variable.
        """
        pct_df = self.calculate_e_m(f"{v.pff_variable}_pct", geotype=geotype)
        pz = pct_df[["census_geoid", "geotype", "e", "m"]].rename(
            columns={"e": "p", "m": "z"}
        )
        return pz

    def calculate_c_e_m_p_z(self, v: Variable, geotype: str) -> pd.DataFrame:
        """
        this function will calculate e, m first, then based on if the
        variable is a median variable or base variable, it would calculate
        p and z accordingly. Note that c calculation is the same for all variables
        """
        output_fields = [
            "census_geoid",
            "pff_variable",
            "geotype",
            "c",
            "e",
            "m",
            "p",
            "z",
        ]
        # If pff_variable is a median variable, then we would need
        # to calculate using calculate_median_e_m for aggregated geography
        # there's no need to calculate p, z for median variables
        if (
            v.pff_variable in self.profile_only_variables
            and geotype not in self.aggregated_geography
        ):
            df = self.calculate_e_m_p_z(v.pff_variable, geotype)

        elif v.pff_variable in self.median_variables:
            df = (
                self.calculate_median_e_m(v.pff_variable, geotype)
                if geotype in self.aggregated_geography
                else self.calculate_e_m(v.pff_variable, geotype)
            )
            df["p"] = 100 if geotype in ["city", "borough"] else np.nan
            df["z"] = np.nan
        else:
            df = (
                self.calculate_e_m(v.pff_variable, geotype)
                if not (
                    (
                        v.pff_variable in self.special_variables
                        and geotype in self.aggregated_geography
                    )
                    or (v.pff_variable == "wrkrnothm")
                )
                # We only calculate special variables for aggregated geographies,
                # with the exception of 'wrkrnothm' (calculate for both aggregated and non-aggregated geographies)
                else self.calculate_special_e_m(v.pff_variable, geotype)
            )
            # If pff_variable is not base_variable, then p,z
            # are calculated against the base variable e(agg_e), m(agg_m)
            if v.pff_variable not in self.base_variables:
                if v.pff_variable in ["pbwpv", "pu18bwpv", "p65plbwpv"]:
                    # special case for poverty variables
                    df_pz = self.calculate_poverty_p_z(v, geotype)
                    df = df.merge(df_pz, on=["census_geoid", "geotype"])
                elif v.base_variable != "nan":
                    if (
                        v.base_variable in self.special_variables
                        and geotype in self.aggregated_geography
                    ):
                        df_base = self.calculate_special_e_m(v.base_variable, geotype)
                    if (
                        v.base_variable in self.median_variables
                        and geotype in self.aggregated_geography
                    ):
                        df_base = self.calculate_median_e_m(v.base_variable, geotype)
                    else:
                        df_base = self.calculate_e_m(v.base_variable, geotype)

                    df = df.merge(
                        df_base[["census_geoid", "e", "m"]].rename(
                            columns={"e": "agg_e", "m": "agg_m"}
                        ),
                        how="left",
                        on="census_geoid",
                    )
                    del df_base
                    df["p"] = df.apply(
                        lambda row: get_p(row["e"], row["agg_e"]), axis=1
                    )
                    df["z"] = df.apply(
                        lambda row: get_z(
                            row["e"], row["m"], row["p"], row["agg_e"], row["agg_m"]
                        ),
                        axis=1,
                    )
                else:
                    # special case for grnorntpd, smpntc,
                    # grpintc, nmsmpntc, cni1864_2, cvlf18t64
                    df["p"] = np.nan
                    df["z"] = np.nan
            # If pff_variable is a base variable, then
            # p = 100 for city and borough level, np.nan otherwise
            # z = np.nan for all levels of geography
            else:
                df["p"] = 100 if geotype in ["city", "borough"] else np.nan
                df["z"] = np.nan

        df["c"] = df.apply(lambda row: get_c(row["e"], row["m"]), axis=1)
        return df[output_fields]

    def create_census_variables(self, census_variable: list) -> (list, list):
        """
        Based on the census variables, spit out the
        M variables and E variables
        e.g. ["B01001_044"] -> ["B01001_044M"], ["B01001_044E"]
        """
        E_variables = [i + "E" for i in census_variable if i[0] != "P"]
        if len(E_variables) == 0:  # Only decennial, pass raw variable name
            E_variables = census_variable
        M_variables = [i + "M" for i in census_variable if i[0] != "P"]
        return E_variables, M_variables

    def aggregate_horizontal(self, v: Variable, geotype: str) -> pd.DataFrame:
        """
        this function will aggregate multiple census_variables into 1 pff_variable
        e.g. ["B01001_044","B01001_020"] -> "mdpop65t66"
        """
        E_variables, M_variables = self.create_census_variables(v.census_variable)
        df = asyncio.run(self.download_variable(self.download_e_m, v, geotype))

        # Aggregate variables horizontally
        df["pff_variable"] = v.pff_variable
        df["geotype"] = geotype
        df["e"] = df[E_variables].sum(axis=1)
        df["m"] = (
            (df[M_variables] ** 2).sum(axis=1) ** 0.5
            if v.source != "decennial"
            else np.nan
        )

        # Create geoid
        df = self.create_census_geoid(df, geotype)

        # Output
        return df[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def create_census_geoid(self, df: pd.DataFrame, geotype: str) -> pd.DataFrame:
        if geotype == "tract":
            df["census_geoid"] = df["state"] + df["county"] + df["tract"]
        elif geotype == "borough":
            df["census_geoid"] = df["state"] + df["county"]
        elif geotype == "city":
            df["census_geoid"] = df["state"] + df["place"]
        elif geotype == "block":
            df["census_geoid"] = df["state"] + df["county"] + df["tract"] + df["block"]
        elif geotype == "block group":
            df["census_geoid"] = (
                df["state"] + df["county"] + df["tract"] + df["block group"]
            )
        return df

    async def download_variable(
        self, download_function, v: Variable, geotype: str
    ) -> pd.DataFrame:
        """
        Given a list of census_variables, and geotype, download data from acs/decennial api
        Note that depends on if we are taking PE/PM variables directly from census API,
        we will pass in self.download_e_m_p_z (data_profile_only) or self.download_e_m (generic)
        as download_func
        """
        geoqueries = self.get_geoquery(geotype)
        tasks = []
        for gq in geoqueries:
            tasks.append(asyncio.create_task(download_function(gq, v)))
        dfs = await asyncio.gather(*tasks)
        return pd.concat(dfs)

    async def download_e_m_p_z(self, geoquery: dict, v: Variable) -> pd.DataFrame:
        """
        This function is for downloading non-aggregated-geotype and data profile only
        variables. It will return e, m, p, z variables for a single pff variable.
        """
        # single source (data profile) only, so safe to set a default
        client = self.c.acs5dp
        census_variable = v.census_variable[0]
        E_variables = census_variable + "E"
        M_variables = census_variable + "M"
        PE_variables = census_variable + "PE"
        PM_variables = census_variable + "PM"
        variables = [E_variables, M_variables, PE_variables, PM_variables]
        df = pd.DataFrame(
            client.get(("NAME", ",".join(variables)), geoquery, year=self.year)
        )
        # If E is an outlier, then set M as Nan
        for var in variables:  # Enforce type safety
            df[var] = df[var].astype("float64")
        df.loc[df[E_variables].isin(self.outliers), M_variables] = np.nan
        df.loc[df[E_variables] == 0, M_variables] = 0
        # Replace all outliers as Nan
        df = df.replace(self.outliers, np.nan)
        return df

    async def download_e_m(self, geoquery: dict, v: Variable) -> pd.DataFrame:
        """
        this function works in conjunction with download_variable,
        and is only created to facilitate multiprocessing, this function
        if for generic variable calculation, returns e, m
        """
        # Get unique sources
        sources = set([i[0] for i in v.census_variable])
        frames = []
        for source in sources:
            # Create Variables for given source and set client
            variables = [i for i in v.census_variable if i[0] == source]
            client = self.client_options.get(source, self.c.acs5)
            E_variables, M_variables = self.create_census_variables(variables)
            frames.append(
                pd.DataFrame(
                    client.get(
                        ("NAME", ",".join(E_variables + M_variables)),
                        geoquery,
                        year=self.year,
                    )
                )
            )
        # Combine results from each source by joining on geo name
        df = frames[0]
        for i in frames[1:]:
            df = pd.merge(
                df,
                i[i.columns.difference(["state", "county", "tract", "place"])],
                left_on="NAME",
                right_on="NAME",
            )
        del frames
        # Enforce type safety
        for i in v.census_variable:
            if i[0] != "P":
                df[f"{i}E"] = df[f"{i}E"].astype("float64")
                df[f"{i}M"] = df[f"{i}M"].astype("float64")
                # If E is zero, then set M as zero
                df.loc[df[f"{i}E"] == 0, f"{i}M"] = 0
                # If E is an outlier, then set M as Nan
                df.loc[df[f"{i}E"].isin(self.outliers), f"{i}M"] = np.nan
            else:
                df[i] = df[i].astype("float64")
        # Replace all outliers as Nan
        df = df.replace(self.outliers, np.nan)
        return df

    def get_geoquery(self, geotype: str) -> list:
        """
        given geotype, this function will create a list of geographic queries
        we would need to pull NYC level data.
        """
        if geotype == "tract":
            return [
                {"for": "tract:*", "in": f"state:{self.state} county:{county}"}
                for county in self.counties
            ]
        elif geotype == "borough":
            return [
                {"for": f"county:{county}", "in": f"state:{self.state}"}
                for county in self.counties
            ]
        elif geotype == "city":
            return [{"for": "place:51000", "in": f"state:{self.state}"}]

        elif geotype == "block":
            return [
                {"for": "block:*", "in": f"state:{self.state} county:{county}"}
                for county in self.counties
            ]

        elif geotype in "block group":
            return [
                {"for": "block group:*", "in": f"state:{self.state} county:{county}"}
                for county in self.counties
            ]

Classes

class Pff (api_key, year=2018, source='acs')
Expand source code
class Pff:
    def __init__(self, api_key, year=2018, source="acs"):
        self.c = Census(api_key)
        self.year = year
        self.source = source
        self.state = 36
        self.counties = ["005", "081", "085", "047", "061"]

        self.client_options = {
            "D": self.c.acs5dp,
            "S": self.c.acs5st,
            "P": self.c.sf1,
            "B": self.c.acs5,
        }

        self.agg_geo = AggregatedGeography(year)
        self.aggregate_vertical_options = self.agg_geo.aggregate_vertical_options
        self.special_variable_options = special_variable_options
        self.outliers = outliers

        # Contains variables where the numerator comes from a DP dataset, but pff uses a different base than the census
        self.profile_only_exceptions = [
            "abroad",
            "cvlfuem2",
            "dfhsdfcnt",
            "dfhssmcnt",
            "dfhsus",
            "hh5",
            "oochu4",
            "p65plbwpv",
            "pbwpv",
            "pu18bwpv",
        ]

    @cached_property
    def metadata(self) -> list:
        with open(
            f"{Path(__file__).parent}/data/{self.source}/{self.year}/metadata.json"
        ) as f:
            return json.load(f)

    @cached_property
    def median(self) -> list:
        with open(
            f"{Path(__file__).parent}/data/{self.source}/{self.year}/median.json"
        ) as f:
            return json.load(f)

    @cached_property
    def special(self) -> list:
        with open(
            f"{Path(__file__).parent}/data/{self.source}/{self.year}/special.json"
        ) as f:
            return json.load(f)

    @cached_property
    def aggregated_geography(self) -> list:
        list3d = [
            [list(k.keys()) for k in i.values()]
            for i in self.aggregate_vertical_options.values()
        ]
        list2d = itertools.chain.from_iterable(list3d)
        return list(set(itertools.chain.from_iterable(list2d)))

    @cached_property
    def profile_only_variables(self) -> list:
        return [
            i["pff_variable"]
            for i in self.metadata
            if (
                i["census_variable"][0][0:2] == "DP"
                and len(i["census_variable"]) == 1
                and i["pff_variable"] not in self.profile_only_exceptions
            )
        ]

    @cached_property
    def base_variables(self) -> list:
        """
        returns a list of base variables in the format of pff_variable
        """
        return list(set([i["base_variable"] for i in self.metadata]))

    @cached_property
    def median_variables(self) -> list:
        """
        returns a list of median variables in the format of pff_variable
        """
        return list(self.median.keys())

    @cached_property
    def special_variables(self) -> list:
        """
        returns a list of special calculation variables in the format
        of pff_variable
        """
        return [i["pff_variable"] for i in self.special]

    def get_special_base_variables(self, pff_variable) -> list:
        """
        returns a list of special calculation base variables in the format
        of pff_variable
        """
        special = list(
            filter(lambda x: x["pff_variable"] == pff_variable, self.special)
        )
        return special[0]["base_variables"]

    def median_ranges(self, pff_variable) -> dict:
        """
        given median variable in the format of pff_variable
        returns the ranges object for the median variable.
        e.g.
        {
            'mdpop0t4': [0, 4.9999],
            'mdpop5t9': [5, 9.9999],
            ...
        }
        """
        return self.median[pff_variable]["ranges"]

    def median_design_factor(self, pff_variable) -> float:
        """
        given median variable in the form of pff_variable
        returns the design_factor needed to calculate the
        median moe
        """
        return self.median[pff_variable]["design_factor"]

    def calculate(self, pff_variable: str, geotype: str) -> pd.DataFrame:
        """
        Main user interface, going through different following steps:
        1. calculate_c_e_m_p_z
        2. rounding
        3. assigning geoid
        """
        # 0. Initialize Variable class instance
        v = self.create_variable(pff_variable)
        # 1. get calculated values (c,e,m,p,z)
        df = self.calculate_c_e_m_p_z(v, geotype)
        # 2. rounding
        df = self.rounding(df, v.rounding)
        # 3. last round of data cleaning
        df = self.cleaning(df, v, geotype)
        return df

    def rounding(self, df: pd.DataFrame, digits: int) -> pd.DataFrame:
        """
        Round c, e, m, p, z fields based on rounding digits from metadata
        """
        df["c"] = df["c"].round(1)
        df["e"] = df["e"].round(digits)
        df["m"] = df["m"].round(digits)
        df["p"] = df["p"].round(1)
        df["z"] = df["z"].round(1)
        return df

    def cleaning(self, df: pd.DataFrame, v: Variable, geotype: str) -> pd.DataFrame:
        """
        last step data cleaning based on rules below:
        """
        # negative values are invalid
        df.loc[df.c < 0, "c"] = np.nan
        df.loc[df.e < 0, "e"] = np.nan
        df.loc[df.m < 0, "m"] = np.nan
        df.loc[df.p < 0, "p"] = np.nan
        df.loc[df.z < 0, "z"] = np.nan

        # p has to be less or equal to 100
        df.loc[df.p > 100, "p"] = np.nan

        # If p = np.nan/, then z = np.nan
        df.loc[(df.p.isna()) | (df.p == 100), "z"] = np.nan

        df.loc[df.e == 0, "c"] = np.nan
        df.loc[df.e == 0 & ~df.pff_variable.isin(self.base_variables), "m"] = np.nan
        df.loc[
            df.e == 0 & df.pff_variable.isin(self.base_variables) & df.m.isna(), "m"
        ] = 0
        df.loc[df.e == 0, "p"] = np.nan
        df.loc[df.e == 0, "z"] = np.nan

        df.loc[
            df.geotype.isin(["borough", "city"])
            & df.pff_variable.isin(self.base_variables)
            & df.c.isna(),
            "c",
        ] = 0

        df.loc[
            df.geotype.isin(["borough", "city"])
            & df.pff_variable.isin(self.base_variables)
            & df.m.isna(),
            "m",
        ] = 0

        df.loc[df.pff_variable.isin(self.base_variables), "p"] = 100

        df.loc[df.pff_variable.isin(self.base_variables), "z"] = np.nan

        return df

    def calculate_multiple_e_m(self, pff_variables: list, geotype: str) -> pd.DataFrame:
        """
        given a list of pff_variables, and geotype, calculate multiple
        variables e, m at the same time using multiprocessing
        """
        _calculate_e_m = partial(self.calculate_e_m, geotype=geotype)
        with Pool(5) as download_pool:
            dfs = download_pool.map(_calculate_e_m, pff_variables)
        df = pd.concat(dfs)
        del dfs
        return df

    def calculate_special_e_m(self, pff_variable: str, geotype: str) -> pd.DataFrame:
        """
        Given pff_variable and geotype, download and calculate the variable.
        Used for variables requiring special horizontal aggregation techniques.
        """
        base_variables = self.get_special_base_variables(pff_variable)
        df = self.calculate_multiple_e_m(base_variables, geotype)
        df = self.special_variable_options[pff_variable](df, base_variables)
        df["pff_variable"] = pff_variable
        df["geotype"] = geotype
        return df[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def calculate_median_e_m(self, pff_variable, geotype) -> pd.DataFrame:
        """
        Given median variable in the form of pff_variable and geotype
        calculate the median and median moe
        """
        # 1. Initialize
        ranges = self.median_ranges(pff_variable)
        design_factor = self.median_design_factor(pff_variable)
        rounding = self.create_variable(pff_variable).rounding

        # 2. Calculate each variable that goes into median calculation
        df = self.calculate_multiple_e_m(list(ranges.keys()), geotype)

        # 3. create a pivot table with census_geoid as the index, and
        # pff_variable as column names. df_pivoted.e -> the estimation dataframe
        df_pivoted = df.loc[:, ["census_geoid", "pff_variable", "e"]].pivot(
            index="census_geoid", columns="pff_variable", values=["e"]
        )

        # Empty dataframe to store the results
        results = pd.DataFrame()
        results["census_geoid"] = df_pivoted.index
        results["pff_variable"] = pff_variable
        results["geotype"] = geotype

        # 4. calculate median estimation using get_median
        results["e"] = (
            df_pivoted.e.loc[
                df_pivoted.e.index == results.census_geoid, list(ranges.keys())
            ]
            .apply(lambda row: get_median(ranges, row), axis=1)
            .to_list()
        )

        # 5. Calculate median moe using get_median_moe
        # Note that median moe calculation needs the median estimation
        # so we seperated df_pivoted.m out as a seperate dataframe
        e = df_pivoted.e.copy()
        e["e"] = results.loc[e.index == results.census_geoid, "e"].to_list()
        results["m"] = (
            e.loc[e.index == results.census_geoid, list(ranges.keys()) + ["e"]]
            .apply(lambda row: get_median_moe(ranges, row, design_factor), axis=1)
            .to_list()
        )

        # 6. return the output, containing the median, and all the variables used
        return results

    def create_variable(self, pff_variable: str) -> Variable:
        """
        given pff_variable name, return a Variable object
        """
        meta = list(filter(lambda x: x["pff_variable"] == pff_variable, self.metadata))[
            0
        ]
        return Variable(meta)

    @lru_cache(maxsize=1048)
    def get_aggregate_vertical(self, source: str, geotype: str):
        """
        this function will aggregate over geographies,
        e.g. aggregate over tracts to get NTA level data
        """
        source = "acs" if source != "decennial" else source
        to_geotype = geotype
        if geotype not in self.aggregated_geography:
            aggregate_vertical = lambda df: df
            from_geotype = geotype
        else:
            options = self.aggregate_vertical_options.get(source)
            for k, v in options.items():
                if geotype in v.keys():
                    from_geotype = k
                    aggregate_vertical = options[k][geotype]
        return from_geotype, aggregate_vertical

    @lru_cache(maxsize=1048)
    def calculate_e_m(self, pff_variable: str, geotype: str) -> pd.DataFrame:
        """
        Given pff_variable and geotype, download and calculate the variable
        """
        cache_path = f".cache/{geotype}/{pff_variable}.pkl"
        if os.path.isfile(cache_path):
            df = pd.read_pickle(cache_path)
        else:
            # 1. create variable
            v = self.create_variable(pff_variable)

            # 2. pulling data from census site and aggregating
            from_geotype, aggregate_vertical = self.get_aggregate_vertical(
                v.source, geotype
            )
            df = self.aggregate_horizontal(v, from_geotype)
            df = aggregate_vertical(df)
            os.makedirs(f".cache/{geotype}", exist_ok=True)
            self.write_to_cache(df, cache_path)
        return df

    def write_to_cache(self, df: pd.DataFrame, path: str):
        if not os.path.isfile(path):
            df.to_pickle(path)
        return None

    def calculate_e_m_p_z(self, pff_variable: str, geotype: str) -> pd.DataFrame:
        """
        This function is used for calculating profile variables only with
        non-aggregated-geography geotype
        """
        # 1. create variable
        v = self.create_variable(pff_variable)
        # hard coding because by definition profile-only
        #  variables only has 1 census variable
        census_variable = v.census_variable[0]
        # 2. pulling data from census site and aggregating
        df = asyncio.run(self.download_variable(self.download_e_m_p_z, v, geotype))
        df["pff_variable"] = pff_variable
        df["geotype"] = geotype
        # 3. Change field names
        columns = {
            census_variable + "E": "e",
            census_variable + "M": "m",
            census_variable + "PE": "p",
            census_variable + "PM": "z",
        }
        df = self.create_census_geoid(df, geotype)
        df = df.rename(columns=columns)
        return df[["census_geoid", "pff_variable", "geotype", "e", "m", "p", "z"]]

    def calculate_poverty_p_z(self, v: Variable, geotype: str) -> pd.DataFrame:
        """
        For below poverty vars, the percent and percent MOE are taken from the ACS,
        but they come from E and M fields, not PE and PM fields. This function
        calculates the E and M for the associated percent variable, then renames as
        P and Z to join with the count variable.
        """
        pct_df = self.calculate_e_m(f"{v.pff_variable}_pct", geotype=geotype)
        pz = pct_df[["census_geoid", "geotype", "e", "m"]].rename(
            columns={"e": "p", "m": "z"}
        )
        return pz

    def calculate_c_e_m_p_z(self, v: Variable, geotype: str) -> pd.DataFrame:
        """
        this function will calculate e, m first, then based on if the
        variable is a median variable or base variable, it would calculate
        p and z accordingly. Note that c calculation is the same for all variables
        """
        output_fields = [
            "census_geoid",
            "pff_variable",
            "geotype",
            "c",
            "e",
            "m",
            "p",
            "z",
        ]
        # If pff_variable is a median variable, then we would need
        # to calculate using calculate_median_e_m for aggregated geography
        # there's no need to calculate p, z for median variables
        if (
            v.pff_variable in self.profile_only_variables
            and geotype not in self.aggregated_geography
        ):
            df = self.calculate_e_m_p_z(v.pff_variable, geotype)

        elif v.pff_variable in self.median_variables:
            df = (
                self.calculate_median_e_m(v.pff_variable, geotype)
                if geotype in self.aggregated_geography
                else self.calculate_e_m(v.pff_variable, geotype)
            )
            df["p"] = 100 if geotype in ["city", "borough"] else np.nan
            df["z"] = np.nan
        else:
            df = (
                self.calculate_e_m(v.pff_variable, geotype)
                if not (
                    (
                        v.pff_variable in self.special_variables
                        and geotype in self.aggregated_geography
                    )
                    or (v.pff_variable == "wrkrnothm")
                )
                # We only calculate special variables for aggregated geographies,
                # with the exception of 'wrkrnothm' (calculate for both aggregated and non-aggregated geographies)
                else self.calculate_special_e_m(v.pff_variable, geotype)
            )
            # If pff_variable is not base_variable, then p,z
            # are calculated against the base variable e(agg_e), m(agg_m)
            if v.pff_variable not in self.base_variables:
                if v.pff_variable in ["pbwpv", "pu18bwpv", "p65plbwpv"]:
                    # special case for poverty variables
                    df_pz = self.calculate_poverty_p_z(v, geotype)
                    df = df.merge(df_pz, on=["census_geoid", "geotype"])
                elif v.base_variable != "nan":
                    if (
                        v.base_variable in self.special_variables
                        and geotype in self.aggregated_geography
                    ):
                        df_base = self.calculate_special_e_m(v.base_variable, geotype)
                    if (
                        v.base_variable in self.median_variables
                        and geotype in self.aggregated_geography
                    ):
                        df_base = self.calculate_median_e_m(v.base_variable, geotype)
                    else:
                        df_base = self.calculate_e_m(v.base_variable, geotype)

                    df = df.merge(
                        df_base[["census_geoid", "e", "m"]].rename(
                            columns={"e": "agg_e", "m": "agg_m"}
                        ),
                        how="left",
                        on="census_geoid",
                    )
                    del df_base
                    df["p"] = df.apply(
                        lambda row: get_p(row["e"], row["agg_e"]), axis=1
                    )
                    df["z"] = df.apply(
                        lambda row: get_z(
                            row["e"], row["m"], row["p"], row["agg_e"], row["agg_m"]
                        ),
                        axis=1,
                    )
                else:
                    # special case for grnorntpd, smpntc,
                    # grpintc, nmsmpntc, cni1864_2, cvlf18t64
                    df["p"] = np.nan
                    df["z"] = np.nan
            # If pff_variable is a base variable, then
            # p = 100 for city and borough level, np.nan otherwise
            # z = np.nan for all levels of geography
            else:
                df["p"] = 100 if geotype in ["city", "borough"] else np.nan
                df["z"] = np.nan

        df["c"] = df.apply(lambda row: get_c(row["e"], row["m"]), axis=1)
        return df[output_fields]

    def create_census_variables(self, census_variable: list) -> (list, list):
        """
        Based on the census variables, spit out the
        M variables and E variables
        e.g. ["B01001_044"] -> ["B01001_044M"], ["B01001_044E"]
        """
        E_variables = [i + "E" for i in census_variable if i[0] != "P"]
        if len(E_variables) == 0:  # Only decennial, pass raw variable name
            E_variables = census_variable
        M_variables = [i + "M" for i in census_variable if i[0] != "P"]
        return E_variables, M_variables

    def aggregate_horizontal(self, v: Variable, geotype: str) -> pd.DataFrame:
        """
        this function will aggregate multiple census_variables into 1 pff_variable
        e.g. ["B01001_044","B01001_020"] -> "mdpop65t66"
        """
        E_variables, M_variables = self.create_census_variables(v.census_variable)
        df = asyncio.run(self.download_variable(self.download_e_m, v, geotype))

        # Aggregate variables horizontally
        df["pff_variable"] = v.pff_variable
        df["geotype"] = geotype
        df["e"] = df[E_variables].sum(axis=1)
        df["m"] = (
            (df[M_variables] ** 2).sum(axis=1) ** 0.5
            if v.source != "decennial"
            else np.nan
        )

        # Create geoid
        df = self.create_census_geoid(df, geotype)

        # Output
        return df[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def create_census_geoid(self, df: pd.DataFrame, geotype: str) -> pd.DataFrame:
        if geotype == "tract":
            df["census_geoid"] = df["state"] + df["county"] + df["tract"]
        elif geotype == "borough":
            df["census_geoid"] = df["state"] + df["county"]
        elif geotype == "city":
            df["census_geoid"] = df["state"] + df["place"]
        elif geotype == "block":
            df["census_geoid"] = df["state"] + df["county"] + df["tract"] + df["block"]
        elif geotype == "block group":
            df["census_geoid"] = (
                df["state"] + df["county"] + df["tract"] + df["block group"]
            )
        return df

    async def download_variable(
        self, download_function, v: Variable, geotype: str
    ) -> pd.DataFrame:
        """
        Given a list of census_variables, and geotype, download data from acs/decennial api
        Note that depends on if we are taking PE/PM variables directly from census API,
        we will pass in self.download_e_m_p_z (data_profile_only) or self.download_e_m (generic)
        as download_func
        """
        geoqueries = self.get_geoquery(geotype)
        tasks = []
        for gq in geoqueries:
            tasks.append(asyncio.create_task(download_function(gq, v)))
        dfs = await asyncio.gather(*tasks)
        return pd.concat(dfs)

    async def download_e_m_p_z(self, geoquery: dict, v: Variable) -> pd.DataFrame:
        """
        This function is for downloading non-aggregated-geotype and data profile only
        variables. It will return e, m, p, z variables for a single pff variable.
        """
        # single source (data profile) only, so safe to set a default
        client = self.c.acs5dp
        census_variable = v.census_variable[0]
        E_variables = census_variable + "E"
        M_variables = census_variable + "M"
        PE_variables = census_variable + "PE"
        PM_variables = census_variable + "PM"
        variables = [E_variables, M_variables, PE_variables, PM_variables]
        df = pd.DataFrame(
            client.get(("NAME", ",".join(variables)), geoquery, year=self.year)
        )
        # If E is an outlier, then set M as Nan
        for var in variables:  # Enforce type safety
            df[var] = df[var].astype("float64")
        df.loc[df[E_variables].isin(self.outliers), M_variables] = np.nan
        df.loc[df[E_variables] == 0, M_variables] = 0
        # Replace all outliers as Nan
        df = df.replace(self.outliers, np.nan)
        return df

    async def download_e_m(self, geoquery: dict, v: Variable) -> pd.DataFrame:
        """
        this function works in conjunction with download_variable,
        and is only created to facilitate multiprocessing, this function
        if for generic variable calculation, returns e, m
        """
        # Get unique sources
        sources = set([i[0] for i in v.census_variable])
        frames = []
        for source in sources:
            # Create Variables for given source and set client
            variables = [i for i in v.census_variable if i[0] == source]
            client = self.client_options.get(source, self.c.acs5)
            E_variables, M_variables = self.create_census_variables(variables)
            frames.append(
                pd.DataFrame(
                    client.get(
                        ("NAME", ",".join(E_variables + M_variables)),
                        geoquery,
                        year=self.year,
                    )
                )
            )
        # Combine results from each source by joining on geo name
        df = frames[0]
        for i in frames[1:]:
            df = pd.merge(
                df,
                i[i.columns.difference(["state", "county", "tract", "place"])],
                left_on="NAME",
                right_on="NAME",
            )
        del frames
        # Enforce type safety
        for i in v.census_variable:
            if i[0] != "P":
                df[f"{i}E"] = df[f"{i}E"].astype("float64")
                df[f"{i}M"] = df[f"{i}M"].astype("float64")
                # If E is zero, then set M as zero
                df.loc[df[f"{i}E"] == 0, f"{i}M"] = 0
                # If E is an outlier, then set M as Nan
                df.loc[df[f"{i}E"].isin(self.outliers), f"{i}M"] = np.nan
            else:
                df[i] = df[i].astype("float64")
        # Replace all outliers as Nan
        df = df.replace(self.outliers, np.nan)
        return df

    def get_geoquery(self, geotype: str) -> list:
        """
        given geotype, this function will create a list of geographic queries
        we would need to pull NYC level data.
        """
        if geotype == "tract":
            return [
                {"for": "tract:*", "in": f"state:{self.state} county:{county}"}
                for county in self.counties
            ]
        elif geotype == "borough":
            return [
                {"for": f"county:{county}", "in": f"state:{self.state}"}
                for county in self.counties
            ]
        elif geotype == "city":
            return [{"for": "place:51000", "in": f"state:{self.state}"}]

        elif geotype == "block":
            return [
                {"for": "block:*", "in": f"state:{self.state} county:{county}"}
                for county in self.counties
            ]

        elif geotype in "block group":
            return [
                {"for": "block group:*", "in": f"state:{self.state} county:{county}"}
                for county in self.counties
            ]

Instance variables

var aggregated_geography
Expand source code
def __get__(self, obj, cls):
    if obj is None:
        return self

    if asyncio and asyncio.iscoroutinefunction(self.func):
        return self._wrap_in_coroutine(obj)

    value = obj.__dict__[self.func.__name__] = self.func(obj)
    return value
var base_variables

returns a list of base variables in the format of pff_variable

Expand source code
def __get__(self, obj, cls):
    if obj is None:
        return self

    if asyncio and asyncio.iscoroutinefunction(self.func):
        return self._wrap_in_coroutine(obj)

    value = obj.__dict__[self.func.__name__] = self.func(obj)
    return value
var median
Expand source code
def __get__(self, obj, cls):
    if obj is None:
        return self

    if asyncio and asyncio.iscoroutinefunction(self.func):
        return self._wrap_in_coroutine(obj)

    value = obj.__dict__[self.func.__name__] = self.func(obj)
    return value
var median_variables

returns a list of median variables in the format of pff_variable

Expand source code
def __get__(self, obj, cls):
    if obj is None:
        return self

    if asyncio and asyncio.iscoroutinefunction(self.func):
        return self._wrap_in_coroutine(obj)

    value = obj.__dict__[self.func.__name__] = self.func(obj)
    return value
var metadata
Expand source code
def __get__(self, obj, cls):
    if obj is None:
        return self

    if asyncio and asyncio.iscoroutinefunction(self.func):
        return self._wrap_in_coroutine(obj)

    value = obj.__dict__[self.func.__name__] = self.func(obj)
    return value
var profile_only_variables
Expand source code
def __get__(self, obj, cls):
    if obj is None:
        return self

    if asyncio and asyncio.iscoroutinefunction(self.func):
        return self._wrap_in_coroutine(obj)

    value = obj.__dict__[self.func.__name__] = self.func(obj)
    return value
var special
Expand source code
def __get__(self, obj, cls):
    if obj is None:
        return self

    if asyncio and asyncio.iscoroutinefunction(self.func):
        return self._wrap_in_coroutine(obj)

    value = obj.__dict__[self.func.__name__] = self.func(obj)
    return value
var special_variables

returns a list of special calculation variables in the format of pff_variable

Expand source code
def __get__(self, obj, cls):
    if obj is None:
        return self

    if asyncio and asyncio.iscoroutinefunction(self.func):
        return self._wrap_in_coroutine(obj)

    value = obj.__dict__[self.func.__name__] = self.func(obj)
    return value

Methods

def aggregate_horizontal(self, v: Variable, geotype: str) ‑> pandas.core.frame.DataFrame

this function will aggregate multiple census_variables into 1 pff_variable e.g. ["B01001_044","B01001_020"] -> "mdpop65t66"

Expand source code
def aggregate_horizontal(self, v: Variable, geotype: str) -> pd.DataFrame:
    """
    this function will aggregate multiple census_variables into 1 pff_variable
    e.g. ["B01001_044","B01001_020"] -> "mdpop65t66"
    """
    E_variables, M_variables = self.create_census_variables(v.census_variable)
    df = asyncio.run(self.download_variable(self.download_e_m, v, geotype))

    # Aggregate variables horizontally
    df["pff_variable"] = v.pff_variable
    df["geotype"] = geotype
    df["e"] = df[E_variables].sum(axis=1)
    df["m"] = (
        (df[M_variables] ** 2).sum(axis=1) ** 0.5
        if v.source != "decennial"
        else np.nan
    )

    # Create geoid
    df = self.create_census_geoid(df, geotype)

    # Output
    return df[["census_geoid", "pff_variable", "geotype", "e", "m"]]
def calculate(self, pff_variable: str, geotype: str) ‑> pandas.core.frame.DataFrame

Main user interface, going through different following steps: 1. calculate_c_e_m_p_z 2. rounding 3. assigning geoid

Expand source code
def calculate(self, pff_variable: str, geotype: str) -> pd.DataFrame:
    """
    Main user interface, going through different following steps:
    1. calculate_c_e_m_p_z
    2. rounding
    3. assigning geoid
    """
    # 0. Initialize Variable class instance
    v = self.create_variable(pff_variable)
    # 1. get calculated values (c,e,m,p,z)
    df = self.calculate_c_e_m_p_z(v, geotype)
    # 2. rounding
    df = self.rounding(df, v.rounding)
    # 3. last round of data cleaning
    df = self.cleaning(df, v, geotype)
    return df
def calculate_c_e_m_p_z(self, v: Variable, geotype: str) ‑> pandas.core.frame.DataFrame

this function will calculate e, m first, then based on if the variable is a median variable or base variable, it would calculate p and z accordingly. Note that c calculation is the same for all variables

Expand source code
def calculate_c_e_m_p_z(self, v: Variable, geotype: str) -> pd.DataFrame:
    """
    this function will calculate e, m first, then based on if the
    variable is a median variable or base variable, it would calculate
    p and z accordingly. Note that c calculation is the same for all variables
    """
    output_fields = [
        "census_geoid",
        "pff_variable",
        "geotype",
        "c",
        "e",
        "m",
        "p",
        "z",
    ]
    # If pff_variable is a median variable, then we would need
    # to calculate using calculate_median_e_m for aggregated geography
    # there's no need to calculate p, z for median variables
    if (
        v.pff_variable in self.profile_only_variables
        and geotype not in self.aggregated_geography
    ):
        df = self.calculate_e_m_p_z(v.pff_variable, geotype)

    elif v.pff_variable in self.median_variables:
        df = (
            self.calculate_median_e_m(v.pff_variable, geotype)
            if geotype in self.aggregated_geography
            else self.calculate_e_m(v.pff_variable, geotype)
        )
        df["p"] = 100 if geotype in ["city", "borough"] else np.nan
        df["z"] = np.nan
    else:
        df = (
            self.calculate_e_m(v.pff_variable, geotype)
            if not (
                (
                    v.pff_variable in self.special_variables
                    and geotype in self.aggregated_geography
                )
                or (v.pff_variable == "wrkrnothm")
            )
            # We only calculate special variables for aggregated geographies,
            # with the exception of 'wrkrnothm' (calculate for both aggregated and non-aggregated geographies)
            else self.calculate_special_e_m(v.pff_variable, geotype)
        )
        # If pff_variable is not base_variable, then p,z
        # are calculated against the base variable e(agg_e), m(agg_m)
        if v.pff_variable not in self.base_variables:
            if v.pff_variable in ["pbwpv", "pu18bwpv", "p65plbwpv"]:
                # special case for poverty variables
                df_pz = self.calculate_poverty_p_z(v, geotype)
                df = df.merge(df_pz, on=["census_geoid", "geotype"])
            elif v.base_variable != "nan":
                if (
                    v.base_variable in self.special_variables
                    and geotype in self.aggregated_geography
                ):
                    df_base = self.calculate_special_e_m(v.base_variable, geotype)
                if (
                    v.base_variable in self.median_variables
                    and geotype in self.aggregated_geography
                ):
                    df_base = self.calculate_median_e_m(v.base_variable, geotype)
                else:
                    df_base = self.calculate_e_m(v.base_variable, geotype)

                df = df.merge(
                    df_base[["census_geoid", "e", "m"]].rename(
                        columns={"e": "agg_e", "m": "agg_m"}
                    ),
                    how="left",
                    on="census_geoid",
                )
                del df_base
                df["p"] = df.apply(
                    lambda row: get_p(row["e"], row["agg_e"]), axis=1
                )
                df["z"] = df.apply(
                    lambda row: get_z(
                        row["e"], row["m"], row["p"], row["agg_e"], row["agg_m"]
                    ),
                    axis=1,
                )
            else:
                # special case for grnorntpd, smpntc,
                # grpintc, nmsmpntc, cni1864_2, cvlf18t64
                df["p"] = np.nan
                df["z"] = np.nan
        # If pff_variable is a base variable, then
        # p = 100 for city and borough level, np.nan otherwise
        # z = np.nan for all levels of geography
        else:
            df["p"] = 100 if geotype in ["city", "borough"] else np.nan
            df["z"] = np.nan

    df["c"] = df.apply(lambda row: get_c(row["e"], row["m"]), axis=1)
    return df[output_fields]
def calculate_e_m(self, pff_variable: str, geotype: str) ‑> pandas.core.frame.DataFrame

Given pff_variable and geotype, download and calculate the variable

Expand source code
@lru_cache(maxsize=1048)
def calculate_e_m(self, pff_variable: str, geotype: str) -> pd.DataFrame:
    """
    Given pff_variable and geotype, download and calculate the variable
    """
    cache_path = f".cache/{geotype}/{pff_variable}.pkl"
    if os.path.isfile(cache_path):
        df = pd.read_pickle(cache_path)
    else:
        # 1. create variable
        v = self.create_variable(pff_variable)

        # 2. pulling data from census site and aggregating
        from_geotype, aggregate_vertical = self.get_aggregate_vertical(
            v.source, geotype
        )
        df = self.aggregate_horizontal(v, from_geotype)
        df = aggregate_vertical(df)
        os.makedirs(f".cache/{geotype}", exist_ok=True)
        self.write_to_cache(df, cache_path)
    return df
def calculate_e_m_p_z(self, pff_variable: str, geotype: str) ‑> pandas.core.frame.DataFrame

This function is used for calculating profile variables only with non-aggregated-geography geotype

Expand source code
def calculate_e_m_p_z(self, pff_variable: str, geotype: str) -> pd.DataFrame:
    """
    This function is used for calculating profile variables only with
    non-aggregated-geography geotype
    """
    # 1. create variable
    v = self.create_variable(pff_variable)
    # hard coding because by definition profile-only
    #  variables only has 1 census variable
    census_variable = v.census_variable[0]
    # 2. pulling data from census site and aggregating
    df = asyncio.run(self.download_variable(self.download_e_m_p_z, v, geotype))
    df["pff_variable"] = pff_variable
    df["geotype"] = geotype
    # 3. Change field names
    columns = {
        census_variable + "E": "e",
        census_variable + "M": "m",
        census_variable + "PE": "p",
        census_variable + "PM": "z",
    }
    df = self.create_census_geoid(df, geotype)
    df = df.rename(columns=columns)
    return df[["census_geoid", "pff_variable", "geotype", "e", "m", "p", "z"]]
def calculate_median_e_m(self, pff_variable, geotype) ‑> pandas.core.frame.DataFrame

Given median variable in the form of pff_variable and geotype calculate the median and median moe

Expand source code
def calculate_median_e_m(self, pff_variable, geotype) -> pd.DataFrame:
    """
    Given median variable in the form of pff_variable and geotype
    calculate the median and median moe
    """
    # 1. Initialize
    ranges = self.median_ranges(pff_variable)
    design_factor = self.median_design_factor(pff_variable)
    rounding = self.create_variable(pff_variable).rounding

    # 2. Calculate each variable that goes into median calculation
    df = self.calculate_multiple_e_m(list(ranges.keys()), geotype)

    # 3. create a pivot table with census_geoid as the index, and
    # pff_variable as column names. df_pivoted.e -> the estimation dataframe
    df_pivoted = df.loc[:, ["census_geoid", "pff_variable", "e"]].pivot(
        index="census_geoid", columns="pff_variable", values=["e"]
    )

    # Empty dataframe to store the results
    results = pd.DataFrame()
    results["census_geoid"] = df_pivoted.index
    results["pff_variable"] = pff_variable
    results["geotype"] = geotype

    # 4. calculate median estimation using get_median
    results["e"] = (
        df_pivoted.e.loc[
            df_pivoted.e.index == results.census_geoid, list(ranges.keys())
        ]
        .apply(lambda row: get_median(ranges, row), axis=1)
        .to_list()
    )

    # 5. Calculate median moe using get_median_moe
    # Note that median moe calculation needs the median estimation
    # so we seperated df_pivoted.m out as a seperate dataframe
    e = df_pivoted.e.copy()
    e["e"] = results.loc[e.index == results.census_geoid, "e"].to_list()
    results["m"] = (
        e.loc[e.index == results.census_geoid, list(ranges.keys()) + ["e"]]
        .apply(lambda row: get_median_moe(ranges, row, design_factor), axis=1)
        .to_list()
    )

    # 6. return the output, containing the median, and all the variables used
    return results
def calculate_multiple_e_m(self, pff_variables: list, geotype: str) ‑> pandas.core.frame.DataFrame

given a list of pff_variables, and geotype, calculate multiple variables e, m at the same time using multiprocessing

Expand source code
def calculate_multiple_e_m(self, pff_variables: list, geotype: str) -> pd.DataFrame:
    """
    given a list of pff_variables, and geotype, calculate multiple
    variables e, m at the same time using multiprocessing
    """
    _calculate_e_m = partial(self.calculate_e_m, geotype=geotype)
    with Pool(5) as download_pool:
        dfs = download_pool.map(_calculate_e_m, pff_variables)
    df = pd.concat(dfs)
    del dfs
    return df
def calculate_poverty_p_z(self, v: Variable, geotype: str) ‑> pandas.core.frame.DataFrame

For below poverty vars, the percent and percent MOE are taken from the ACS, but they come from E and M fields, not PE and PM fields. This function calculates the E and M for the associated percent variable, then renames as P and Z to join with the count variable.

Expand source code
def calculate_poverty_p_z(self, v: Variable, geotype: str) -> pd.DataFrame:
    """
    For below poverty vars, the percent and percent MOE are taken from the ACS,
    but they come from E and M fields, not PE and PM fields. This function
    calculates the E and M for the associated percent variable, then renames as
    P and Z to join with the count variable.
    """
    pct_df = self.calculate_e_m(f"{v.pff_variable}_pct", geotype=geotype)
    pz = pct_df[["census_geoid", "geotype", "e", "m"]].rename(
        columns={"e": "p", "m": "z"}
    )
    return pz
def calculate_special_e_m(self, pff_variable: str, geotype: str) ‑> pandas.core.frame.DataFrame

Given pff_variable and geotype, download and calculate the variable. Used for variables requiring special horizontal aggregation techniques.

Expand source code
def calculate_special_e_m(self, pff_variable: str, geotype: str) -> pd.DataFrame:
    """
    Given pff_variable and geotype, download and calculate the variable.
    Used for variables requiring special horizontal aggregation techniques.
    """
    base_variables = self.get_special_base_variables(pff_variable)
    df = self.calculate_multiple_e_m(base_variables, geotype)
    df = self.special_variable_options[pff_variable](df, base_variables)
    df["pff_variable"] = pff_variable
    df["geotype"] = geotype
    return df[["census_geoid", "pff_variable", "geotype", "e", "m"]]
def cleaning(self, df: pandas.core.frame.DataFrame, v: Variable, geotype: str) ‑> pandas.core.frame.DataFrame

last step data cleaning based on rules below:

Expand source code
def cleaning(self, df: pd.DataFrame, v: Variable, geotype: str) -> pd.DataFrame:
    """
    last step data cleaning based on rules below:
    """
    # negative values are invalid
    df.loc[df.c < 0, "c"] = np.nan
    df.loc[df.e < 0, "e"] = np.nan
    df.loc[df.m < 0, "m"] = np.nan
    df.loc[df.p < 0, "p"] = np.nan
    df.loc[df.z < 0, "z"] = np.nan

    # p has to be less or equal to 100
    df.loc[df.p > 100, "p"] = np.nan

    # If p = np.nan/, then z = np.nan
    df.loc[(df.p.isna()) | (df.p == 100), "z"] = np.nan

    df.loc[df.e == 0, "c"] = np.nan
    df.loc[df.e == 0 & ~df.pff_variable.isin(self.base_variables), "m"] = np.nan
    df.loc[
        df.e == 0 & df.pff_variable.isin(self.base_variables) & df.m.isna(), "m"
    ] = 0
    df.loc[df.e == 0, "p"] = np.nan
    df.loc[df.e == 0, "z"] = np.nan

    df.loc[
        df.geotype.isin(["borough", "city"])
        & df.pff_variable.isin(self.base_variables)
        & df.c.isna(),
        "c",
    ] = 0

    df.loc[
        df.geotype.isin(["borough", "city"])
        & df.pff_variable.isin(self.base_variables)
        & df.m.isna(),
        "m",
    ] = 0

    df.loc[df.pff_variable.isin(self.base_variables), "p"] = 100

    df.loc[df.pff_variable.isin(self.base_variables), "z"] = np.nan

    return df
def create_census_geoid(self, df: pandas.core.frame.DataFrame, geotype: str) ‑> pandas.core.frame.DataFrame
Expand source code
def create_census_geoid(self, df: pd.DataFrame, geotype: str) -> pd.DataFrame:
    if geotype == "tract":
        df["census_geoid"] = df["state"] + df["county"] + df["tract"]
    elif geotype == "borough":
        df["census_geoid"] = df["state"] + df["county"]
    elif geotype == "city":
        df["census_geoid"] = df["state"] + df["place"]
    elif geotype == "block":
        df["census_geoid"] = df["state"] + df["county"] + df["tract"] + df["block"]
    elif geotype == "block group":
        df["census_geoid"] = (
            df["state"] + df["county"] + df["tract"] + df["block group"]
        )
    return df
def create_census_variables(self, census_variable: list) ‑> ()

Based on the census variables, spit out the M variables and E variables e.g. ["B01001_044"] -> ["B01001_044M"], ["B01001_044E"]

Expand source code
def create_census_variables(self, census_variable: list) -> (list, list):
    """
    Based on the census variables, spit out the
    M variables and E variables
    e.g. ["B01001_044"] -> ["B01001_044M"], ["B01001_044E"]
    """
    E_variables = [i + "E" for i in census_variable if i[0] != "P"]
    if len(E_variables) == 0:  # Only decennial, pass raw variable name
        E_variables = census_variable
    M_variables = [i + "M" for i in census_variable if i[0] != "P"]
    return E_variables, M_variables
def create_variable(self, pff_variable: str) ‑> Variable

given pff_variable name, return a Variable object

Expand source code
def create_variable(self, pff_variable: str) -> Variable:
    """
    given pff_variable name, return a Variable object
    """
    meta = list(filter(lambda x: x["pff_variable"] == pff_variable, self.metadata))[
        0
    ]
    return Variable(meta)
async def download_e_m(self, geoquery: dict, v: Variable) ‑> pandas.core.frame.DataFrame

this function works in conjunction with download_variable, and is only created to facilitate multiprocessing, this function if for generic variable calculation, returns e, m

Expand source code
async def download_e_m(self, geoquery: dict, v: Variable) -> pd.DataFrame:
    """
    this function works in conjunction with download_variable,
    and is only created to facilitate multiprocessing, this function
    if for generic variable calculation, returns e, m
    """
    # Get unique sources
    sources = set([i[0] for i in v.census_variable])
    frames = []
    for source in sources:
        # Create Variables for given source and set client
        variables = [i for i in v.census_variable if i[0] == source]
        client = self.client_options.get(source, self.c.acs5)
        E_variables, M_variables = self.create_census_variables(variables)
        frames.append(
            pd.DataFrame(
                client.get(
                    ("NAME", ",".join(E_variables + M_variables)),
                    geoquery,
                    year=self.year,
                )
            )
        )
    # Combine results from each source by joining on geo name
    df = frames[0]
    for i in frames[1:]:
        df = pd.merge(
            df,
            i[i.columns.difference(["state", "county", "tract", "place"])],
            left_on="NAME",
            right_on="NAME",
        )
    del frames
    # Enforce type safety
    for i in v.census_variable:
        if i[0] != "P":
            df[f"{i}E"] = df[f"{i}E"].astype("float64")
            df[f"{i}M"] = df[f"{i}M"].astype("float64")
            # If E is zero, then set M as zero
            df.loc[df[f"{i}E"] == 0, f"{i}M"] = 0
            # If E is an outlier, then set M as Nan
            df.loc[df[f"{i}E"].isin(self.outliers), f"{i}M"] = np.nan
        else:
            df[i] = df[i].astype("float64")
    # Replace all outliers as Nan
    df = df.replace(self.outliers, np.nan)
    return df
async def download_e_m_p_z(self, geoquery: dict, v: Variable) ‑> pandas.core.frame.DataFrame

This function is for downloading non-aggregated-geotype and data profile only variables. It will return e, m, p, z variables for a single pff variable.

Expand source code
async def download_e_m_p_z(self, geoquery: dict, v: Variable) -> pd.DataFrame:
    """
    This function is for downloading non-aggregated-geotype and data profile only
    variables. It will return e, m, p, z variables for a single pff variable.
    """
    # single source (data profile) only, so safe to set a default
    client = self.c.acs5dp
    census_variable = v.census_variable[0]
    E_variables = census_variable + "E"
    M_variables = census_variable + "M"
    PE_variables = census_variable + "PE"
    PM_variables = census_variable + "PM"
    variables = [E_variables, M_variables, PE_variables, PM_variables]
    df = pd.DataFrame(
        client.get(("NAME", ",".join(variables)), geoquery, year=self.year)
    )
    # If E is an outlier, then set M as Nan
    for var in variables:  # Enforce type safety
        df[var] = df[var].astype("float64")
    df.loc[df[E_variables].isin(self.outliers), M_variables] = np.nan
    df.loc[df[E_variables] == 0, M_variables] = 0
    # Replace all outliers as Nan
    df = df.replace(self.outliers, np.nan)
    return df
async def download_variable(self, download_function, v: Variable, geotype: str) ‑> pandas.core.frame.DataFrame

Given a list of census_variables, and geotype, download data from acs/decennial api Note that depends on if we are taking PE/PM variables directly from census API, we will pass in self.download_e_m_p_z (data_profile_only) or self.download_e_m (generic) as download_func

Expand source code
async def download_variable(
    self, download_function, v: Variable, geotype: str
) -> pd.DataFrame:
    """
    Given a list of census_variables, and geotype, download data from acs/decennial api
    Note that depends on if we are taking PE/PM variables directly from census API,
    we will pass in self.download_e_m_p_z (data_profile_only) or self.download_e_m (generic)
    as download_func
    """
    geoqueries = self.get_geoquery(geotype)
    tasks = []
    for gq in geoqueries:
        tasks.append(asyncio.create_task(download_function(gq, v)))
    dfs = await asyncio.gather(*tasks)
    return pd.concat(dfs)
def get_aggregate_vertical(self, source: str, geotype: str)

this function will aggregate over geographies, e.g. aggregate over tracts to get NTA level data

Expand source code
@lru_cache(maxsize=1048)
def get_aggregate_vertical(self, source: str, geotype: str):
    """
    this function will aggregate over geographies,
    e.g. aggregate over tracts to get NTA level data
    """
    source = "acs" if source != "decennial" else source
    to_geotype = geotype
    if geotype not in self.aggregated_geography:
        aggregate_vertical = lambda df: df
        from_geotype = geotype
    else:
        options = self.aggregate_vertical_options.get(source)
        for k, v in options.items():
            if geotype in v.keys():
                from_geotype = k
                aggregate_vertical = options[k][geotype]
    return from_geotype, aggregate_vertical
def get_geoquery(self, geotype: str) ‑> list

given geotype, this function will create a list of geographic queries we would need to pull NYC level data.

Expand source code
def get_geoquery(self, geotype: str) -> list:
    """
    given geotype, this function will create a list of geographic queries
    we would need to pull NYC level data.
    """
    if geotype == "tract":
        return [
            {"for": "tract:*", "in": f"state:{self.state} county:{county}"}
            for county in self.counties
        ]
    elif geotype == "borough":
        return [
            {"for": f"county:{county}", "in": f"state:{self.state}"}
            for county in self.counties
        ]
    elif geotype == "city":
        return [{"for": "place:51000", "in": f"state:{self.state}"}]

    elif geotype == "block":
        return [
            {"for": "block:*", "in": f"state:{self.state} county:{county}"}
            for county in self.counties
        ]

    elif geotype in "block group":
        return [
            {"for": "block group:*", "in": f"state:{self.state} county:{county}"}
            for county in self.counties
        ]
def get_special_base_variables(self, pff_variable) ‑> list

returns a list of special calculation base variables in the format of pff_variable

Expand source code
def get_special_base_variables(self, pff_variable) -> list:
    """
    returns a list of special calculation base variables in the format
    of pff_variable
    """
    special = list(
        filter(lambda x: x["pff_variable"] == pff_variable, self.special)
    )
    return special[0]["base_variables"]
def median_design_factor(self, pff_variable) ‑> float

given median variable in the form of pff_variable returns the design_factor needed to calculate the median moe

Expand source code
def median_design_factor(self, pff_variable) -> float:
    """
    given median variable in the form of pff_variable
    returns the design_factor needed to calculate the
    median moe
    """
    return self.median[pff_variable]["design_factor"]
def median_ranges(self, pff_variable) ‑> dict

given median variable in the format of pff_variable returns the ranges object for the median variable. e.g. { 'mdpop0t4': [0, 4.9999], 'mdpop5t9': [5, 9.9999], … }

Expand source code
def median_ranges(self, pff_variable) -> dict:
    """
    given median variable in the format of pff_variable
    returns the ranges object for the median variable.
    e.g.
    {
        'mdpop0t4': [0, 4.9999],
        'mdpop5t9': [5, 9.9999],
        ...
    }
    """
    return self.median[pff_variable]["ranges"]
def rounding(self, df: pandas.core.frame.DataFrame, digits: int) ‑> pandas.core.frame.DataFrame

Round c, e, m, p, z fields based on rounding digits from metadata

Expand source code
def rounding(self, df: pd.DataFrame, digits: int) -> pd.DataFrame:
    """
    Round c, e, m, p, z fields based on rounding digits from metadata
    """
    df["c"] = df["c"].round(1)
    df["e"] = df["e"].round(digits)
    df["m"] = df["m"].round(digits)
    df["p"] = df["p"].round(1)
    df["z"] = df["z"].round(1)
    return df
def write_to_cache(self, df: pandas.core.frame.DataFrame, path: str)
Expand source code
def write_to_cache(self, df: pd.DataFrame, path: str):
    if not os.path.isfile(path):
        df.to_pickle(path)
    return None