Module library.script.nypl_libraries

Expand source code
import json

import pandas as pd
import requests

from . import df_to_tempfile


class Scriptor:
    def __init__(self, **kwargs):
        self.__dict__.update(kwargs)

    def ingest(self) -> pd.DataFrame:
        url = "https://refinery.nypl.org/api/nypl/locations/v1.0/locations"
        content = requests.get(url).content
        records = json.loads(content)["locations"]
        data = []
        for i in records:
            parsed = dict(
                lon=str(i["geolocation"]["coordinates"][0]),
                lat=str(i["geolocation"]["coordinates"][1]),
                name=i["name"],
                zipcode=i["postal_code"],
                address=i["street_address"],
                locality=i["locality"],
                region=i["region"],
            )
            data.append(parsed)
        df = pd.DataFrame.from_dict(data, orient="columns")
        return df

    def runner(self) -> str:
        df = self.ingest()
        local_path = df_to_tempfile(df)
        return local_path

Classes

class Scriptor (**kwargs)
Expand source code
class Scriptor:
    def __init__(self, **kwargs):
        self.__dict__.update(kwargs)

    def ingest(self) -> pd.DataFrame:
        url = "https://refinery.nypl.org/api/nypl/locations/v1.0/locations"
        content = requests.get(url).content
        records = json.loads(content)["locations"]
        data = []
        for i in records:
            parsed = dict(
                lon=str(i["geolocation"]["coordinates"][0]),
                lat=str(i["geolocation"]["coordinates"][1]),
                name=i["name"],
                zipcode=i["postal_code"],
                address=i["street_address"],
                locality=i["locality"],
                region=i["region"],
            )
            data.append(parsed)
        df = pd.DataFrame.from_dict(data, orient="columns")
        return df

    def runner(self) -> str:
        df = self.ingest()
        local_path = df_to_tempfile(df)
        return local_path

Methods

def ingest(self) ‑> pandas.core.frame.DataFrame
Expand source code
def ingest(self) -> pd.DataFrame:
    url = "https://refinery.nypl.org/api/nypl/locations/v1.0/locations"
    content = requests.get(url).content
    records = json.loads(content)["locations"]
    data = []
    for i in records:
        parsed = dict(
            lon=str(i["geolocation"]["coordinates"][0]),
            lat=str(i["geolocation"]["coordinates"][1]),
            name=i["name"],
            zipcode=i["postal_code"],
            address=i["street_address"],
            locality=i["locality"],
            region=i["region"],
        )
        data.append(parsed)
    df = pd.DataFrame.from_dict(data, orient="columns")
    return df
def runner(self) ‑> str
Expand source code
def runner(self) -> str:
    df = self.ingest()
    local_path = df_to_tempfile(df)
    return local_path