Example ETL/metadata pipeline

This is a quick guide to building a full ETL pipeline, along with associated metadata, for the Data Observatory.

As an example, we will bring in the Quarterly Census of Employment and Wages (QCEW), a product of the Bureau of Labor Statistics. This dataset tracks the number of employees, firms, and average wages across the full gamut of North American Industry Classification System (NAICS) industries.

QCEW is, of course, a quarterly release, and counties are the smallest geography considered.

The process of building a Python module to bring a new dataset into the Data Observatory can be broadly divided into six steps:

We use Luigi to isolate each step into a Task. A Task has well-defined inputs (other tasks) and outputs (files, tables on disk, etc.) In a nutshell:

  • a task cannot be run if it is complete
  • if all of a Task‘s outputs exist, then it is complete
  • in order to run, all of a Task‘s requirements must be complete

Each of the steps except (1) corresponds to a Task.

The actual flow of Task dependencies could be charted like this:

Name Description
Download data DownloadUnzipTask, Task
Import data CSV2TempTableTask, Shp2TempTableTask, TempTableTask
Preprocess data TempTableTask
Write metadata ColumnsTask
Output table TableTask

Where each step should be a Task subclassed from the noted Bigmetadata utility class.

We use a set of utility classes to avoid writing repetitive code.

To get started, make sure you’re running the IPython notebook container.

docker-compose up -d ipython

Then, get the port of the running IPython notebook container:

make ps

And navigate to it in your browser.

1. Import libraries

# Import a test runner

from tests.util import runtask
# We'll need these basic utility classes and methods

from tasks.util import underscore_slugify, shell, classpath
from tasks.base_tasks import (TempTableTask, TableTask, ColumnsTask,
                         DownloadUnzipTask, CSV2TempTableTask)
from tasks.meta import current_session, DENOMINATOR

# We like OrderedDict because it makes it easy to pass dicts
# like {column name : column definition, ..} where order still
# can matter in SQL
from collections import OrderedDict
from luigi import IntParameter, Parameter
import os
# These imports are useful for checking the database

from tasks.meta import OBSTable, OBSColumn, OBSTag
# We'll also want these tags for metadata

from tasks.tags import SectionTags, SubsectionTags, UnitTags

2. Download the data

The first step of most ETLs is going to be downloading the source and saving it to a temporary folder.

DownloadUnzipTask is a utility class that handles the file naming and unzipping of the temporary output for you. You just have to write the code which will do the download to the output file name.

class DownloadQCEW(DownloadUnzipTask):

    year = IntParameter()

    URL = 'http://www.bls.gov/cew/data/files/{year}/csv/{year}_qtrly_singlefile.zip'

    def download(self):
        shell('wget -O {output}.zip {url}'.format(
           output=self.output().path,
           url=self.URL.format(year=self.year)
        ))

Within the IPython environment, we can create and run the task within a sandbox.

We have to specify the year, since it’s specified as a parameter without a default.

download_task = DownloadQCEW(year=2014)
runtask(download_task)

Provided the output folder exists, the DownloadQCEW task for 2014 will not run again.

download_task.output().path
'tmp/tmp/DownloadQCEW_2014_cfabf27024'
download_task.output().exists()
True

3. Import data into PostgreSQL

A lot of processing can be done in PostgreSQL quite easily. We have utility classes to more easily bring both Shapefiles and CSVs into PostgreSQL.

For CSV2TempTableTask, we only have to define an input_csv method that will return a path (or iterable of paths) to the CSV(s). The header row will automatically be checked and used to construct a schema to bring the data in.

The standard requires method of Luigi is used here, too. This requires that the DownloadQCEW task for the same year must be run beforehand; the output from that task is now accessible as the input of this one.

class RawQCEW(CSV2TempTableTask):

    year = IntParameter()

    def requires(self):
        return DownloadQCEW(year=self.year)

    def input_csv(self):
        return os.path.join(self.input().path,'{}.q1-q4.singlefile.csv'.format(self.year))

Run the task. If the table exists and has more than 0 rows, it will not be run again.

current_session().rollback()
raw_task = RawQCEW(year=2014)

runtask(raw_task)

Confirm the task has completed successfully.

raw_task.complete()
True

Session can be used to execute raw queries against the table.

The output of a TempTableTask can be queried directly by using its table method, which is a string with the fully schema-qualified table name. We are guaranteed names that are unique to the module/task/parameters without having to come up with any names manually.

raw_task.output().table
'"tmp".RawQCEW_2014_cfabf27024'
session = current_session()
resp = session.execute('select count(*) from {}'.format(raw_task.output().table))
resp.fetchall()
[(14276508L,)]

4. Preprocess data in PostgreSQL

QCEW data has a lot of rows we don’t actually need – these can be filtered out in SQL easily.

For QCEW, the download files are annual, but contain quarterly time periods. Output tables should be limited to a single point in time. We’re also only interested in private employment (own_code = '5') and county level aggregation by total (71), supersector (73), and NAICS sector (74).

class SimpleQCEW(TempTableTask):

    year = IntParameter()
    qtr = IntParameter()

    def requires(self):
        return RawQCEW(year=self.year)

    def run(self):
        session = current_session()
        session.execute("CREATE TABLE {output} AS "
                        "SELECT * FROM {input} "
                        "WHERE agglvl_code IN ('74', '73', '71') "
                        "  AND year = '{year}' "
                        "  AND qtr = '{qtr}' "
                        "  AND own_code = '5' ".format(
                            input=self.input().table,
                            output=self.output().table,
                            year=self.year,
                            qtr=self.qtr,
                       ))

Run the task and confirm it completed. We don’t have to run each step as we write it, as requirements guarantee anything required will be run.

simple_task = SimpleQCEW(year=2014, qtr=4)
runtask(simple_task)
simple_task.complete()
True
simple_task.output().table
'"tmp".SimpleQCEW_4_2014_79152e4934'
resp = session.execute('select count(*) from {}'.format(simple_task.output().table))
resp.fetchall()
[(97167L,)]

5. Write metadata

We have to create metadata for the measures we’re interested in from QCEW. Often metadata don’t take parameters, but this one is, since we have to reorganize the table from one row per NAICS code to one column per NAICS code, which is easiest done programmatically.

The ColumnsTask provides a structure for generating metadata. The only required method is columns. What must be returned from that method is an OrderedDict whose values are all OBSColumn and whose keys are all strings. The keys may be used as human-readable column names in tables based off this metadata, although that is not always the case. If the id of the OBSColumn is left blank, the dict’s key will be used to generate it (qualified by the module).

Also, conventionally there will be a requires method that brings in our standard tags: SectionTags, SubsectionTags, and UnitTags. This is an example of defining several tasks as prerequisites: the outputs of those tasks will be accessible via self.input()[<key>] in other methods.

from tasks.us.naics import (NAICS_CODES, is_supersector, is_sector,
                            get_parent_code)

class QCEWColumns(ColumnsTask):

    naics_code = Parameter()

    def requires(self):
        requirements = {
            'sections': SectionTags(),
            'subsections': SubsectionTags(),
            'units': UnitTags(),
        }
        parent_code = get_parent_code(self.naics_code)
        if parent_code:
            requirements['parent'] = QCEWColumns(naics_code=parent_code)

        return requirements

    def columns(self):
        cols = OrderedDict()
        code, name, description = self.naics_code, NAICS_CODES[self.naics_code], ''

        # This gives us easier access to the tags we defined as dependencies
        input_ = self.input()
        units = input_['units']
        sections = input_['sections']
        subsections = input_['subsections']
        parent = input_.get('parent')
        cols['avg_wkly_wage'] = OBSColumn(
            # Make sure the column ID is unique within this module
            # If left blank, will be taken from this column's key in the output OrderedDict
            id=underscore_slugify(u'avg_wkly_wage_{}'.format(code)),
            # The PostgreSQL type of this column.  Generally Numeric for numbers and Text
            # for categories.
            type='Numeric',
            # Human-readable name.  Will be used as header in the catalog
            name=u'Average weekly wage for {} establishments'.format(name),
            # Human-readable description.  Will be used as content in the catalog.
            description=u'Average weekly wage for a given quarter in the {name} industry (NAICS {code}).'
                        u'{name} is {description}.'.format(name=name, code=code, description=description),
            # Ranking of importance, sometimes used to favor certain measures in auto-selection
            # Weight of 0 will hide this column from the user.  We generally use between 0 and 10
            weight=5,
            # How this measure was derived, for example "sum", "median", "average", etc.
            # In cases of "sum", this means functions downstream can construct estimates
            # for arbitrary geographies
            aggregate='average',
            # Tags are our way of noting aspects of this measure like its unit, the country
            # it's relevant to, and which section(s) of the catalog it should appear in.
            tags=[units['money'], sections['united_states'], subsections['income']],
        )
        cols['qtrly_estabs'] = OBSColumn(
            id=underscore_slugify(u'qtrly_estabs_{}'.format(code)),
            type='Numeric',
            name=u'Establishments in {}'.format(name),
            description=u'Count of establishments in a given quarter in the {name} industry (NAICS {code}).'
                        u'{name} is {description}.'.format(name=name, code=code, description=description),
            weight=5,
            aggregate='sum',
            tags=[units['businesses'], sections['united_states'], subsections['commerce_economy']],
            targets={parent['qtrly_estabs']: DENOMINATOR} if parent else {},
        )
        cols['month3_emplvl'] = OBSColumn(
            id=underscore_slugify(u'month3_emplvl_{}'.format(code)),
            type='Numeric',
            name=u'Employees in {} establishments'.format(name),
            description=u'Number of employees in the third month of a given quarter with the {name} '
                        u'industry (NAICS {code}). {name} is {description}.'.format(
                            name=name, code=code, description=description),
            weight=5,
            aggregate='sum',
            tags=[units['people'], sections['united_states'], subsections['employment']],
        )
        cols['lq_avg_wkly_wage'] = OBSColumn(
            id=underscore_slugify(u'lq_avg_wkly_wage_{}'.format(code)),
            type='Numeric',
            name=u'Average weekly wage location quotient for {} establishments'.format(name),
            description=u'Location quotient of the average weekly wage for a given quarter relative to '
                        u'the U.S. (Rounded to the hundredths place) within the {name} industry (NAICS {code}).'
                        u'{name} is {description}.'.format(name=name, code=code, description=description),
            weight=3,
            aggregate=None,
            tags=[units['ratio'], sections['united_states'], subsections['income']],
        )
        cols['lq_qtrly_estabs'] = OBSColumn(
            id=underscore_slugify(u'lq_qtrly_estabs_{}'.format(code)),
            type='Numeric',
            name=u'Location quotient of establishments in {}'.format(name),
            description=u'Location quotient of the quarterly establishment count relative to '
                        u'the U.S. (Rounded to the hundredths place) within the {name} industry (NAICS {code}).'
                        u'{name} is {description}.'.format(name=name, code=code, description=description),
            weight=3,
            aggregate=None,
            tags=[units['ratio'], sections['united_states'], subsections['commerce_economy']],
        )
        cols['lq_month3_emplvl'] = OBSColumn(
            id=underscore_slugify(u'lq_month3_emplvl_{}'.format(code)),
            type='Numeric',
            name=u'Employment level location quotient in {} establishments'.format(name),
            description=u'Location quotient of the employment level for the third month of a given quarter '
                        u'relative to the U.S. (Rounded to the hundredths place) within the {name} '
                        u'industry (NAICS {code}). {name} is {description}.'.format(
                            name=name, code=code, description=description),
            weight=3,
            aggregate=None,
            tags=[units['ratio'], sections['united_states'], subsections['employment']],
        )
        return cols

We should never run metadata tasks on their own – they should be defined as requirements by TableTask, below – but it is possible to do so, as an example.

NAICS code ‘1025’ is the supersector for eduction & health.

education_health_columns = QCEWColumns(naics_code='1025')
runtask(education_health_columns)
education_health_columns.complete()
True

Output from a ColumnsTask is an OrderedDict with the columns wrapped in ColumnTargets, which allow us to pass them around without immediately committing them to the database.

education_health_columns.output()
OrderedDict([('avg_wkly_wage', <tasks.targets.ColumnTarget at 0x7f12a40eead0>),
             ('qtrly_estabs', <tasks.targets.ColumnTarget at 0x7f12a5831c50>),
             ('month3_emplvl', <tasks.targets.ColumnTarget at 0x7f12a5831090>),
             ('lq_avg_wkly_wage', <tasks.targets.ColumnTarget at 0x7f12a4338b10>),
             ('lq_qtrly_estabs', <tasks.targets.ColumnTarget at 0x7f12a4d1fb50>),
             ('lq_month3_emplvl',
              <tasks.targets.ColumnTarget at 0x7f12a4525390>)])

We can check the OBSColumn table for evidence that our metadata has been committed to disk, since we ran the task.

[(col.id, col.name) for col in session.query(OBSColumn)[:5]]
[(u'tmp.avg_wkly_wage_10',
  u'Average weekly wage for Total, all industries establishments'),
 (u'tmp.qtrly_estabs_10', u'Establishments in Total, all industries'),
 (u'tmp.month3_emplvl_10',
  u'Employees in Total, all industries establishments'),
 (u'tmp.lq_avg_wkly_wage_10',
  u'Average weekly wage location quotient for Total, all industries establishments'),
 (u'tmp.lq_qtrly_estabs_10',
  u'Location quotient of establishments in Total, all industries')]

6. Populate output table

Now that we have our data in a format similar to what we’ll need, and our metadata lined up, we can tie it together with a TableTask. Under the hood, TableTask handles the relational lifting between columns and actual data, and assigns a hash number to the dataset.

Several methods must be overriden for TableTask to work:

  • version(): a version control number, which is useful for forcing a re-run/overwrite without having to track down and delete output artifacts.
  • table_timespan(): the timespan (for example, ‘2014’, or ‘2012Q4’) that identifies the date range or point-in-time for this table.
  • columns(): an OrderedDict of (colname, ColumnTarget) pairs. This should be constructed by pulling the desired columns from required ColumnsTask classes.
  • populate(): a method that should populate (most often via) INSERT the output table.
# Since we have a column ('area_fips') that is a shared reference to
# geometries ('geom_ref') we have to import that column.
from tasks.us.census.tiger import GeoidColumns

class QCEW(TableTask):

    year = IntParameter()
    qtr = IntParameter()

    def version(self):
        return 1

    def requires(self):
        requirements = {
            'data': SimpleQCEW(year=self.year, qtr=self.qtr),
            'geoid_cols': GeoidColumns(),
            'naics': OrderedDict()
        }
        for naics_code, naics_name in NAICS_CODES.iteritems():
            # Only include the more general NAICS codes
            if is_supersector(naics_code) or is_sector(naics_code) or naics_code == '10':
                requirements['naics'][naics_code] = QCEWColumns(naics_code=naics_code)
        return requirements

    def table_timespan(self):
        return get_timespan('{year}Q{qtr}'.format(year=self.year, qtr=self.qtr))

    def columns(self):
        # Here we assemble an OrderedDict using our requirements to specify the
        # columns that go into this table.
        # The column name
        input_ = self.input()
        cols = OrderedDict([
            ('area_fips', input_['geoid_cols']['county_geoid'])
        ])
        for naics_code, naics_cols in input_['naics'].iteritems():
            for key, coltarget in naics_cols.iteritems():
                naics_name = NAICS_CODES[naics_code]
                colname = underscore_slugify(u'{}_{}_{}'.format(
                        key, naics_code, naics_name))
                cols[colname] = coltarget
        return cols

    def populate(self):
        # This select statement transforms the input table, taking advantage of our
        # new column names.
        # The session is automatically committed if there are no errors.
        session = current_session()
        columns = self.columns()
        colnames = columns.keys()
        select_colnames = []
        for naics_code, naics_columns in self.input()['naics'].iteritems():
            for colname, coltarget in naics_columns.iteritems():
                select_colnames.append('''MAX(CASE
                    WHEN industry_code = '{naics_code}' THEN {colname} ELSE NULL
                END)::Numeric'''.format(naics_code=naics_code,
                            colname=colname
                          ))
        insert = '''INSERT INTO {output} ({colnames})
                    SELECT area_fips, {select_colnames}
                    FROM {input}
                    GROUP BY area_fips '''.format(
                        output=self.output().table,
                        input=self.input()['data'].table,
                        colnames=', '.join(colnames),
                        select_colnames=', '.join(select_colnames),
                    )
        session.execute(insert)

On a fresh database, this should return False Will not run if it has been run before for this year & quarter combination.

table_task = QCEW(year=2014, qtr=4)
runtask(table_task)
table_task.complete()
True

The table should exist in metadata, as well as in data, with all relations well-defined.

Unlike the TempTableTasks above, the output of a TableTask is a postgrse table in the observatory schema, with a unique hash name.

table = table_task.output()
table.table
'observatory.obs_3dc49b70f71ed9bbf5b4a48773c860519af70e1e'

It’s possible for us to peek at the output data.

session.execute('SELECT * FROM {} LIMIT 1'.format(table.table)).fetchall()
[(u'01001', None, None, None, None, None, None, Decimal('395'), Decimal('5'), Decimal('144'), Decimal('0.65'), Decimal('0.52'), Decimal('0.68'), Decimal('609'), Decimal('80'), Decimal('1024'), Decimal('0.96'), Decimal('0.66'), Decimal('0.74'), Decimal('364'), Decimal('68'), Decimal('368'), Decimal('0.79'), Decimal('0.95'), Decimal('1.13'), Decimal('917'), Decimal('3'), Decimal('66'), Decimal('0.68'), Decimal('0.94'), Decimal('1.00'), Decimal('2317'), Decimal('5'), Decimal('103'), Decimal('1.89'), Decimal('3.26'), Decimal('2.45'), Decimal('914'), Decimal('77'), Decimal('426'), Decimal('1.17'), Decimal('1.16'), Decimal('0.90'), Decimal('1231'), Decimal('33'), Decimal('157'), Decimal('1.26'), Decimal('0.60'), Decimal('0.35'), Decimal('925'), Decimal('20'), Decimal('198'), Decimal('1.14'), Decimal('1.66'), Decimal('1.30'), Decimal('914'), Decimal('77'), Decimal('426'), Decimal('1.17'), Decimal('1.16'), Decimal('0.90'), Decimal('1225'), Decimal('30'), Decimal('1347'), Decimal('1.45'), Decimal('1.01'), Decimal('1.44'), Decimal('584'), Decimal('85'), Decimal('1168'), Decimal('0.93'), Decimal('0.65'), Decimal('0.73'), Decimal('904'), Decimal('91'), Decimal('380'), Decimal('0.98'), Decimal('0.61'), Decimal('0.25'), None, None, None, None, None, None, Decimal('433'), Decimal('149'), Decimal('1935'), Decimal('1.13'), Decimal('1.63'), Decimal('1.57'), Decimal('1225'), Decimal('30'), Decimal('1347'), Decimal('1.45'), Decimal('1.01'), Decimal('1.44'), Decimal('274'), Decimal('66'), Decimal('1432'), Decimal('1.10'), Decimal('1.13'), Decimal('1.50'), Decimal('301'), Decimal('8'), Decimal('66'), Decimal('0.53'), Decimal('0.68'), Decimal('0.44'), Decimal('620'), Decimal('15'), Decimal('127'), Decimal('0.97'), Decimal('0.73'), Decimal('0.36'), None, None, None, None, None, None, Decimal('929'), Decimal('17'), Decimal('132'), Decimal('2.13'), Decimal('1.91'), Decimal('1.53'), Decimal('677'), Decimal('768'), Decimal('8173'), Decimal('0.97'), Decimal('0.95'), Decimal('0.91'), Decimal('364'), Decimal('68'), Decimal('368'), Decimal('0.79'), Decimal('0.95'), Decimal('1.13'), Decimal('275'), Decimal('74'), Decimal('1498'), Decimal('0.94'), Decimal('1.06'), Decimal('1.35'), Decimal('584'), Decimal('202'), Decimal('2322'), Decimal('1.01'), Decimal('1.20'), Decimal('1.12'), Decimal('781'), Decimal('114'), Decimal('430'), Decimal('0.70'), Decimal('1.55'), Decimal('0.72'), Decimal('1201'), Decimal('7'), Decimal('36'), Decimal('1.02'), Decimal('0.52'), Decimal('0.17'), Decimal('0'), Decimal('3'), Decimal('0'), Decimal('0'), Decimal('0.57'), Decimal('0'), Decimal('860'), Decimal('62'), Decimal('190'), Decimal('0.69'), Decimal('0.62'), Decimal('0.29'), Decimal('0'), Decimal('26'), Decimal('0'), Decimal('0'), Decimal('0.59'), Decimal('0'), Decimal('1201'), Decimal('7'), Decimal('36'), Decimal('1.02'), Decimal('0.52'), Decimal('0.17'), None, None, None, None, None, None, Decimal('628'), Decimal('41'), Decimal('122'), Decimal('0.87'), Decimal('1.28'), Decimal('0.77'), Decimal('842'), Decimal('73'), Decimal('308'), Decimal('0.67'), Decimal('1.75'), Decimal('0.71'))]