Module factfinder.aggregated_geography

Expand source code
import math
from pathlib import Path

import numpy as np
import pandas as pd
from cached_property import cached_property


class AggregatedGeography:
    def __init__(self, year=2018):
        self.year = year

    @cached_property
    def lookup_geo(self):
        year = (
            self.year // 10 * 10
        )  # find the current decennial year based on given year
        lookup_geo = pd.read_csv(
            f"{Path(__file__).parent}/data/lookup_geo/{year}/lookup_geo.csv",
            dtype="str",
        )
        lookup_geo["geoid_block"] = lookup_geo.county_fips + lookup_geo.ctcb2010
        lookup_geo["geoid_block_group"] = lookup_geo.geoid_block.apply(
            lambda x: x[0:12]
        )
        lookup_geo["geoid_tract"] = lookup_geo.county_fips + lookup_geo.ct2010
        lookup_geo["cd_fp_500"] = lookup_geo.apply(
            lambda row: row["cd"] if int(row["fp_500"]) else np.nan, axis=1
        )
        lookup_geo["cd_fp_100"] = lookup_geo.apply(
            lambda row: row["cd"] if int(row["fp_100"]) else np.nan, axis=1
        )
        lookup_geo["cd_park_access"] = lookup_geo.apply(
            lambda row: row["cd"] if int(row["park_access"]) else np.nan, axis=1
        )
        return lookup_geo

    @staticmethod
    def agg_moe(x):
        return math.sqrt(sum([i ** 2 for i in x]))

    @staticmethod
    def create_output(df, colname):
        """
        this function will calculate the aggregated e and m
        given colname we would like to aggregate over
        """
        return (
            df[[colname, "e"]]
            .groupby([colname])
            .sum()
            .merge(
                df[[colname, "m"]].groupby([colname]).agg(AggregatedGeography.agg_moe),
                on=colname,
            )
            .reset_index()
            .rename(columns={colname: "census_geoid"})
        )

    def tract_to_nta(self, df):
        df = df.merge(
            self.lookup_geo[["geoid_tract", "nta"]].drop_duplicates(),
            how="left",
            right_on="geoid_tract",
            left_on="census_geoid",
        )
        output = AggregatedGeography.create_output(df, "nta")
        output["pff_variable"] = df["pff_variable"].to_list()[0]
        output["geotype"] = "NTA"
        return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def block_group_to_cd_fp500(self, df):
        """
        500 yr flood plain aggregation for block group data (ACS)
        """
        df = df.merge(
            self.lookup_geo.loc[
                ~self.lookup_geo.cd_fp_500.isna(), ["geoid_block_group", "cd_fp_500"]
            ].drop_duplicates(),
            how="right",
            right_on="geoid_block_group",
            left_on="census_geoid",
        )
        output = AggregatedGeography.create_output(df, "cd_fp_500")
        output["pff_variable"] = df["pff_variable"].to_list()[0]
        output["geotype"] = "cd_fp_500"
        return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def block_group_to_cd_fp100(self, df):
        """
        100 yr flood plain aggregation for block group data (ACS)
        """
        df = df.merge(
            self.lookup_geo.loc[
                ~self.lookup_geo.cd_fp_100.isna(), ["geoid_block_group", "cd_fp_100"]
            ].drop_duplicates(),
            how="right",
            right_on="geoid_block_group",
            left_on="census_geoid",
        )
        output = AggregatedGeography.create_output(df, "cd_fp_100")
        output["pff_variable"] = df["pff_variable"].to_list()[0]
        output["geotype"] = "cd_fp_100"
        return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def block_group_to_cd_park_access(self, df):
        """
        walk-to-park access zone aggregation for block group data (acs)
        """
        df = df.merge(
            self.lookup_geo.loc[
                ~self.lookup_geo.cd_park_access.isna(),
                ["geoid_block_group", "cd_park_access"],
            ].drop_duplicates(),
            how="right",
            right_on="geoid_block_group",
            left_on="census_geoid",
        )
        output = AggregatedGeography.create_output(df, "cd_park_access")
        output["pff_variable"] = df["pff_variable"].to_list()[0]
        output["geotype"] = "cd_park_access"
        return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def block_to_cd_fp500(self, df):
        """
        500 yr flood plain aggregation for block data (decennial)
        """
        df = df.merge(
            self.lookup_geo.loc[
                ~self.lookup_geo.cd_fp_500.isna(), ["geoid_block", "cd_fp_500"]
            ].drop_duplicates(),
            how="right",
            right_on="geoid_block",
            left_on="census_geoid",
        )
        output = AggregatedGeography.create_output(df, "cd_fp_500")
        output["pff_variable"] = df["pff_variable"].to_list()[0]
        output["geotype"] = "cd_fp_500"
        return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def block_to_cd_fp100(self, df):
        """
        100 yr flood plain aggregation for block data (decennial)
        """
        df = df.merge(
            self.lookup_geo.loc[
                ~self.lookup_geo.cd_fp_100.isna(), ["geoid_block", "cd_fp_100"]
            ].drop_duplicates(),
            how="right",
            right_on="geoid_block",
            left_on="census_geoid",
        )
        output = AggregatedGeography.create_output(df, "cd_fp_100")
        output["pff_variable"] = df["pff_variable"].to_list()[0]
        output["geotype"] = "cd_fp_100"
        return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def block_to_cd_park_access(self, df):
        """
        walk-to-park access zone aggregation for block data (decennial)
        """
        df = df.merge(
            self.lookup_geo.loc[
                ~self.lookup_geo.cd_park_access.isna(),
                ["geoid_block", "cd_park_access"],
            ].drop_duplicates(),
            how="right",
            right_on="geoid_block",
            left_on="census_geoid",
        )
        output = AggregatedGeography.create_output(df, "cd_park_access")
        output["pff_variable"] = df["pff_variable"].to_list()[0]
        output["geotype"] = "cd_park_access"
        return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def tract_to_cd(self, df):
        """
        tract to cd
        """
        df = df.merge(
            self.lookup_geo[["geoid_tract", "cd"]].drop_duplicates(),
            how="left",
            right_on="geoid_tract",
            left_on="census_geoid",
        )
        output = AggregatedGeography.create_output(df, "cd")
        output["pff_variable"] = df["pff_variable"].to_list()[0]
        output["geotype"] = "cd"
        return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    @cached_property
    def aggregate_vertical_options(self):
        return {
            "decennial": {
                "tract": {"NTA": self.tract_to_nta, "cd": self.tract_to_cd},
                "block": {
                    "cd_fp_500": self.block_to_cd_fp500,
                    "cd_fp_100": self.block_to_cd_fp100,
                    "cd_park_access": self.block_to_cd_park_access,
                },
            },
            "acs": {
                "tract": {"NTA": self.tract_to_nta, "cd": self.tract_to_cd},
                "block group": {
                    "cd_fp_500": self.block_group_to_cd_fp500,
                    "cd_fp_100": self.block_group_to_cd_fp100,
                    "cd_park_access": self.block_group_to_cd_park_access,
                },
            },
        }

Classes

class AggregatedGeography (year=2018)
Expand source code
class AggregatedGeography:
    def __init__(self, year=2018):
        self.year = year

    @cached_property
    def lookup_geo(self):
        year = (
            self.year // 10 * 10
        )  # find the current decennial year based on given year
        lookup_geo = pd.read_csv(
            f"{Path(__file__).parent}/data/lookup_geo/{year}/lookup_geo.csv",
            dtype="str",
        )
        lookup_geo["geoid_block"] = lookup_geo.county_fips + lookup_geo.ctcb2010
        lookup_geo["geoid_block_group"] = lookup_geo.geoid_block.apply(
            lambda x: x[0:12]
        )
        lookup_geo["geoid_tract"] = lookup_geo.county_fips + lookup_geo.ct2010
        lookup_geo["cd_fp_500"] = lookup_geo.apply(
            lambda row: row["cd"] if int(row["fp_500"]) else np.nan, axis=1
        )
        lookup_geo["cd_fp_100"] = lookup_geo.apply(
            lambda row: row["cd"] if int(row["fp_100"]) else np.nan, axis=1
        )
        lookup_geo["cd_park_access"] = lookup_geo.apply(
            lambda row: row["cd"] if int(row["park_access"]) else np.nan, axis=1
        )
        return lookup_geo

    @staticmethod
    def agg_moe(x):
        return math.sqrt(sum([i ** 2 for i in x]))

    @staticmethod
    def create_output(df, colname):
        """
        this function will calculate the aggregated e and m
        given colname we would like to aggregate over
        """
        return (
            df[[colname, "e"]]
            .groupby([colname])
            .sum()
            .merge(
                df[[colname, "m"]].groupby([colname]).agg(AggregatedGeography.agg_moe),
                on=colname,
            )
            .reset_index()
            .rename(columns={colname: "census_geoid"})
        )

    def tract_to_nta(self, df):
        df = df.merge(
            self.lookup_geo[["geoid_tract", "nta"]].drop_duplicates(),
            how="left",
            right_on="geoid_tract",
            left_on="census_geoid",
        )
        output = AggregatedGeography.create_output(df, "nta")
        output["pff_variable"] = df["pff_variable"].to_list()[0]
        output["geotype"] = "NTA"
        return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def block_group_to_cd_fp500(self, df):
        """
        500 yr flood plain aggregation for block group data (ACS)
        """
        df = df.merge(
            self.lookup_geo.loc[
                ~self.lookup_geo.cd_fp_500.isna(), ["geoid_block_group", "cd_fp_500"]
            ].drop_duplicates(),
            how="right",
            right_on="geoid_block_group",
            left_on="census_geoid",
        )
        output = AggregatedGeography.create_output(df, "cd_fp_500")
        output["pff_variable"] = df["pff_variable"].to_list()[0]
        output["geotype"] = "cd_fp_500"
        return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def block_group_to_cd_fp100(self, df):
        """
        100 yr flood plain aggregation for block group data (ACS)
        """
        df = df.merge(
            self.lookup_geo.loc[
                ~self.lookup_geo.cd_fp_100.isna(), ["geoid_block_group", "cd_fp_100"]
            ].drop_duplicates(),
            how="right",
            right_on="geoid_block_group",
            left_on="census_geoid",
        )
        output = AggregatedGeography.create_output(df, "cd_fp_100")
        output["pff_variable"] = df["pff_variable"].to_list()[0]
        output["geotype"] = "cd_fp_100"
        return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def block_group_to_cd_park_access(self, df):
        """
        walk-to-park access zone aggregation for block group data (acs)
        """
        df = df.merge(
            self.lookup_geo.loc[
                ~self.lookup_geo.cd_park_access.isna(),
                ["geoid_block_group", "cd_park_access"],
            ].drop_duplicates(),
            how="right",
            right_on="geoid_block_group",
            left_on="census_geoid",
        )
        output = AggregatedGeography.create_output(df, "cd_park_access")
        output["pff_variable"] = df["pff_variable"].to_list()[0]
        output["geotype"] = "cd_park_access"
        return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def block_to_cd_fp500(self, df):
        """
        500 yr flood plain aggregation for block data (decennial)
        """
        df = df.merge(
            self.lookup_geo.loc[
                ~self.lookup_geo.cd_fp_500.isna(), ["geoid_block", "cd_fp_500"]
            ].drop_duplicates(),
            how="right",
            right_on="geoid_block",
            left_on="census_geoid",
        )
        output = AggregatedGeography.create_output(df, "cd_fp_500")
        output["pff_variable"] = df["pff_variable"].to_list()[0]
        output["geotype"] = "cd_fp_500"
        return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def block_to_cd_fp100(self, df):
        """
        100 yr flood plain aggregation for block data (decennial)
        """
        df = df.merge(
            self.lookup_geo.loc[
                ~self.lookup_geo.cd_fp_100.isna(), ["geoid_block", "cd_fp_100"]
            ].drop_duplicates(),
            how="right",
            right_on="geoid_block",
            left_on="census_geoid",
        )
        output = AggregatedGeography.create_output(df, "cd_fp_100")
        output["pff_variable"] = df["pff_variable"].to_list()[0]
        output["geotype"] = "cd_fp_100"
        return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def block_to_cd_park_access(self, df):
        """
        walk-to-park access zone aggregation for block data (decennial)
        """
        df = df.merge(
            self.lookup_geo.loc[
                ~self.lookup_geo.cd_park_access.isna(),
                ["geoid_block", "cd_park_access"],
            ].drop_duplicates(),
            how="right",
            right_on="geoid_block",
            left_on="census_geoid",
        )
        output = AggregatedGeography.create_output(df, "cd_park_access")
        output["pff_variable"] = df["pff_variable"].to_list()[0]
        output["geotype"] = "cd_park_access"
        return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    def tract_to_cd(self, df):
        """
        tract to cd
        """
        df = df.merge(
            self.lookup_geo[["geoid_tract", "cd"]].drop_duplicates(),
            how="left",
            right_on="geoid_tract",
            left_on="census_geoid",
        )
        output = AggregatedGeography.create_output(df, "cd")
        output["pff_variable"] = df["pff_variable"].to_list()[0]
        output["geotype"] = "cd"
        return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]

    @cached_property
    def aggregate_vertical_options(self):
        return {
            "decennial": {
                "tract": {"NTA": self.tract_to_nta, "cd": self.tract_to_cd},
                "block": {
                    "cd_fp_500": self.block_to_cd_fp500,
                    "cd_fp_100": self.block_to_cd_fp100,
                    "cd_park_access": self.block_to_cd_park_access,
                },
            },
            "acs": {
                "tract": {"NTA": self.tract_to_nta, "cd": self.tract_to_cd},
                "block group": {
                    "cd_fp_500": self.block_group_to_cd_fp500,
                    "cd_fp_100": self.block_group_to_cd_fp100,
                    "cd_park_access": self.block_group_to_cd_park_access,
                },
            },
        }

Static methods

def agg_moe(x)
Expand source code
@staticmethod
def agg_moe(x):
    return math.sqrt(sum([i ** 2 for i in x]))
def create_output(df, colname)

this function will calculate the aggregated e and m given colname we would like to aggregate over

Expand source code
@staticmethod
def create_output(df, colname):
    """
    this function will calculate the aggregated e and m
    given colname we would like to aggregate over
    """
    return (
        df[[colname, "e"]]
        .groupby([colname])
        .sum()
        .merge(
            df[[colname, "m"]].groupby([colname]).agg(AggregatedGeography.agg_moe),
            on=colname,
        )
        .reset_index()
        .rename(columns={colname: "census_geoid"})
    )

Instance variables

var aggregate_vertical_options
Expand source code
def __get__(self, obj, cls):
    if obj is None:
        return self

    if asyncio and asyncio.iscoroutinefunction(self.func):
        return self._wrap_in_coroutine(obj)

    value = obj.__dict__[self.func.__name__] = self.func(obj)
    return value
var lookup_geo
Expand source code
def __get__(self, obj, cls):
    if obj is None:
        return self

    if asyncio and asyncio.iscoroutinefunction(self.func):
        return self._wrap_in_coroutine(obj)

    value = obj.__dict__[self.func.__name__] = self.func(obj)
    return value

Methods

def block_group_to_cd_fp100(self, df)

100 yr flood plain aggregation for block group data (ACS)

Expand source code
def block_group_to_cd_fp100(self, df):
    """
    100 yr flood plain aggregation for block group data (ACS)
    """
    df = df.merge(
        self.lookup_geo.loc[
            ~self.lookup_geo.cd_fp_100.isna(), ["geoid_block_group", "cd_fp_100"]
        ].drop_duplicates(),
        how="right",
        right_on="geoid_block_group",
        left_on="census_geoid",
    )
    output = AggregatedGeography.create_output(df, "cd_fp_100")
    output["pff_variable"] = df["pff_variable"].to_list()[0]
    output["geotype"] = "cd_fp_100"
    return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]
def block_group_to_cd_fp500(self, df)

500 yr flood plain aggregation for block group data (ACS)

Expand source code
def block_group_to_cd_fp500(self, df):
    """
    500 yr flood plain aggregation for block group data (ACS)
    """
    df = df.merge(
        self.lookup_geo.loc[
            ~self.lookup_geo.cd_fp_500.isna(), ["geoid_block_group", "cd_fp_500"]
        ].drop_duplicates(),
        how="right",
        right_on="geoid_block_group",
        left_on="census_geoid",
    )
    output = AggregatedGeography.create_output(df, "cd_fp_500")
    output["pff_variable"] = df["pff_variable"].to_list()[0]
    output["geotype"] = "cd_fp_500"
    return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]
def block_group_to_cd_park_access(self, df)

walk-to-park access zone aggregation for block group data (acs)

Expand source code
def block_group_to_cd_park_access(self, df):
    """
    walk-to-park access zone aggregation for block group data (acs)
    """
    df = df.merge(
        self.lookup_geo.loc[
            ~self.lookup_geo.cd_park_access.isna(),
            ["geoid_block_group", "cd_park_access"],
        ].drop_duplicates(),
        how="right",
        right_on="geoid_block_group",
        left_on="census_geoid",
    )
    output = AggregatedGeography.create_output(df, "cd_park_access")
    output["pff_variable"] = df["pff_variable"].to_list()[0]
    output["geotype"] = "cd_park_access"
    return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]
def block_to_cd_fp100(self, df)

100 yr flood plain aggregation for block data (decennial)

Expand source code
def block_to_cd_fp100(self, df):
    """
    100 yr flood plain aggregation for block data (decennial)
    """
    df = df.merge(
        self.lookup_geo.loc[
            ~self.lookup_geo.cd_fp_100.isna(), ["geoid_block", "cd_fp_100"]
        ].drop_duplicates(),
        how="right",
        right_on="geoid_block",
        left_on="census_geoid",
    )
    output = AggregatedGeography.create_output(df, "cd_fp_100")
    output["pff_variable"] = df["pff_variable"].to_list()[0]
    output["geotype"] = "cd_fp_100"
    return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]
def block_to_cd_fp500(self, df)

500 yr flood plain aggregation for block data (decennial)

Expand source code
def block_to_cd_fp500(self, df):
    """
    500 yr flood plain aggregation for block data (decennial)
    """
    df = df.merge(
        self.lookup_geo.loc[
            ~self.lookup_geo.cd_fp_500.isna(), ["geoid_block", "cd_fp_500"]
        ].drop_duplicates(),
        how="right",
        right_on="geoid_block",
        left_on="census_geoid",
    )
    output = AggregatedGeography.create_output(df, "cd_fp_500")
    output["pff_variable"] = df["pff_variable"].to_list()[0]
    output["geotype"] = "cd_fp_500"
    return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]
def block_to_cd_park_access(self, df)

walk-to-park access zone aggregation for block data (decennial)

Expand source code
def block_to_cd_park_access(self, df):
    """
    walk-to-park access zone aggregation for block data (decennial)
    """
    df = df.merge(
        self.lookup_geo.loc[
            ~self.lookup_geo.cd_park_access.isna(),
            ["geoid_block", "cd_park_access"],
        ].drop_duplicates(),
        how="right",
        right_on="geoid_block",
        left_on="census_geoid",
    )
    output = AggregatedGeography.create_output(df, "cd_park_access")
    output["pff_variable"] = df["pff_variable"].to_list()[0]
    output["geotype"] = "cd_park_access"
    return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]
def tract_to_cd(self, df)

tract to cd

Expand source code
def tract_to_cd(self, df):
    """
    tract to cd
    """
    df = df.merge(
        self.lookup_geo[["geoid_tract", "cd"]].drop_duplicates(),
        how="left",
        right_on="geoid_tract",
        left_on="census_geoid",
    )
    output = AggregatedGeography.create_output(df, "cd")
    output["pff_variable"] = df["pff_variable"].to_list()[0]
    output["geotype"] = "cd"
    return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]
def tract_to_nta(self, df)
Expand source code
def tract_to_nta(self, df):
    df = df.merge(
        self.lookup_geo[["geoid_tract", "nta"]].drop_duplicates(),
        how="left",
        right_on="geoid_tract",
        left_on="census_geoid",
    )
    output = AggregatedGeography.create_output(df, "nta")
    output["pff_variable"] = df["pff_variable"].to_list()[0]
    output["geotype"] = "NTA"
    return output[["census_geoid", "pff_variable", "geotype", "e", "m"]]