Example ETL/metadata pipeline ============================= This is a quick guide to building a full ETL pipeline, along with associated metadata, for the Data Observatory. As an example, we will bring in the `Quarterly Census of Employment and Wages `__ (QCEW), a product of the Bureau of Labor Statistics. This dataset tracks the number of employees, firms, and average wages across the full gamut of `North American Industry Classification System `__ (NAICS) industries. QCEW is, of course, a quarterly release, and counties are the smallest geography considered. The process of building a Python module to bring a new dataset into the Data Observatory can be broadly divided into six steps: .. contents :: :local: We use `Luigi `__ to isolate each step into a ``Task``. A ``Task`` has well-defined inputs (other tasks) and outputs (files, tables on disk, etc.) In a nutshell: - a task cannot be run if it is complete - if all of a ``Task``'s outputs exist, then it is complete - in order to run, all of a ``Task``'s requirements must be complete Each of the steps except (1) corresponds to a ``Task``. The actual flow of ``Task`` dependencies could be charted like this: .. blockdiag:: :desctable: { default_fontsize = 16; node_width = 240; node_height = 60; download_data [ label = "Download data", description = ":class:`~.tasks.DownloadUnzipTask`, :class:`Task` " ]; import_data [ label = "Import data", description = ":class:`~.tasks.CSV2TempTableTask`, :class:`~.tasks.Shp2TempTableTask`, :class:`~.tasks.TempTableTask` "]; process_data [ label = "Preprocess data", description = ":class:`~.tasks.TempTableTask`", stacked ]; generate_metadata [ label = "Write metadata", description = ":class:`~.tasks.ColumnsTask`" ]; output_table [ label = "Output table", description = ":class:`~.tasks.TableTask`" ]; process_data -> process_data; download_data -> import_data -> process_data -> output_table; generate_metadata -> output_table; } Where each step should be a ``Task`` subclassed from the noted Bigmetadata utility class. We use a set of utility classes to avoid writing repetitive code. To get started, make sure you're running the IPython notebook container. .. code:: shell docker-compose up -d ipython Then, get the port of the running IPython notebook container: .. code:: shell make ps And navigate to it in your browser. 1. Import libraries ------------------- .. code:: python # Import a test runner from tests.util import runtask .. code:: python # We'll need these basic utility classes and methods from tasks.util import underscore_slugify, shell, classpath from tasks.base_tasks import (TempTableTask, TableTask, ColumnsTask, DownloadUnzipTask, CSV2TempTableTask) from tasks.meta import current_session, DENOMINATOR # We like OrderedDict because it makes it easy to pass dicts # like {column name : column definition, ..} where order still # can matter in SQL from collections import OrderedDict from luigi import IntParameter, Parameter import os .. code:: python # These imports are useful for checking the database from tasks.meta import OBSTable, OBSColumn, OBSTag .. code:: python # We'll also want these tags for metadata from tasks.tags import SectionTags, SubsectionTags, UnitTags 2. Download the data -------------------- The first step of most ETLs is going to be downloading the source and saving it to a temporary folder. ``DownloadUnzipTask`` is a utility class that handles the file naming and unzipping of the temporary output for you. You just have to write the code which will do the download to the output file name. .. code:: python class DownloadQCEW(DownloadUnzipTask): year = IntParameter() URL = 'http://www.bls.gov/cew/data/files/{year}/csv/{year}_qtrly_singlefile.zip' def download(self): shell('wget -O {output}.zip {url}'.format( output=self.output().path, url=self.URL.format(year=self.year) )) Within the IPython environment, we can create and run the task within a sandbox. We have to specify the year, since it's specified as a parameter without a default. .. code:: python download_task = DownloadQCEW(year=2014) runtask(download_task) Provided the output folder exists, the ``DownloadQCEW`` task for 2014 will not run again. .. code:: python download_task.output().path .. parsed-literal:: 'tmp/tmp/DownloadQCEW_2014_cfabf27024' .. code:: python download_task.output().exists() .. parsed-literal:: True 3. Import data into PostgreSQL ------------------------------ A lot of processing can be done in PostgreSQL quite easily. We have utility classes to more easily bring both Shapefiles and CSVs into PostgreSQL. For ``CSV2TempTableTask``, we only have to define an ``input_csv`` method that will return a path (or iterable of paths) to the CSV(s). The header row will automatically be checked and used to construct a schema to bring the data in. The standard ``requires`` method of Luigi is used here, too. This requires that the ``DownloadQCEW`` task for the same year must be run beforehand; the ``output`` from that task is now accessible as the ``input`` of this one. .. code:: python class RawQCEW(CSV2TempTableTask): year = IntParameter() def requires(self): return DownloadQCEW(year=self.year) def input_csv(self): return os.path.join(self.input().path,'{}.q1-q4.singlefile.csv'.format(self.year)) Run the task. If the table exists and has more than 0 rows, it will not be run again. .. code:: python current_session().rollback() raw_task = RawQCEW(year=2014) runtask(raw_task) Confirm the task has completed successfully. .. code:: python raw_task.complete() .. parsed-literal:: True Session can be used to execute raw queries against the table. The output of a ``TempTableTask`` can be queried directly by using its ``table`` method, which is a string with the fully schema-qualified table name. We are guaranteed names that are unique to the module/task/parameters without having to come up with any names manually. .. code:: python raw_task.output().table .. parsed-literal:: '"tmp".RawQCEW_2014_cfabf27024' .. code:: python session = current_session() resp = session.execute('select count(*) from {}'.format(raw_task.output().table)) resp.fetchall() .. parsed-literal:: [(14276508L,)] 4. Preprocess data in PostgreSQL -------------------------------- QCEW data has a lot of rows we don't actually need -- these can be filtered out in SQL easily. For QCEW, the download files are annual, but contain quarterly time periods. Output tables should be limited to a single point in time. We're also only interested in private employment (``own_code = '5'``) and county level aggregation by total (71), supersector (73), and NAICS sector (74). .. code:: python class SimpleQCEW(TempTableTask): year = IntParameter() qtr = IntParameter() def requires(self): return RawQCEW(year=self.year) def run(self): session = current_session() session.execute("CREATE TABLE {output} AS " "SELECT * FROM {input} " "WHERE agglvl_code IN ('74', '73', '71') " " AND year = '{year}' " " AND qtr = '{qtr}' " " AND own_code = '5' ".format( input=self.input().table, output=self.output().table, year=self.year, qtr=self.qtr, )) Run the task and confirm it completed. We don't have to run each step as we write it, as requirements guarantee anything required will be run. .. code:: python simple_task = SimpleQCEW(year=2014, qtr=4) runtask(simple_task) simple_task.complete() .. parsed-literal:: True .. code:: python simple_task.output().table .. parsed-literal:: '"tmp".SimpleQCEW_4_2014_79152e4934' .. code:: python resp = session.execute('select count(*) from {}'.format(simple_task.output().table)) resp.fetchall() .. parsed-literal:: [(97167L,)] 5. Write metadata ----------------- We have to create metadata for the measures we're interested in from QCEW. Often metadata don't take parameters, but this one is, since we have to reorganize the table from one row per NAICS code to one column per NAICS code, which is easiest done programmatically. The ``ColumnsTask`` provides a structure for generating metadata. The only required method is ``columns``. What must be returned from that method is an ``OrderedDict`` whose values are all ``OBSColumn`` and whose keys are all strings. The keys may be used as human-readable column names in tables based off this metadata, although that is not always the case. If the ``id`` of the ``OBSColumn`` is left blank, the dict's key will be used to generate it (qualified by the module). Also, conventionally there will be a ``requires`` method that brings in our standard tags: ``SectionTags``, ``SubsectionTags``, and ``UnitTags``. This is an example of defining several tasks as prerequisites: the outputs of those tasks will be accessible via ``self.input()[]`` in other methods. .. code:: python from tasks.us.naics import (NAICS_CODES, is_supersector, is_sector, get_parent_code) class QCEWColumns(ColumnsTask): naics_code = Parameter() def requires(self): requirements = { 'sections': SectionTags(), 'subsections': SubsectionTags(), 'units': UnitTags(), } parent_code = get_parent_code(self.naics_code) if parent_code: requirements['parent'] = QCEWColumns(naics_code=parent_code) return requirements def columns(self): cols = OrderedDict() code, name, description = self.naics_code, NAICS_CODES[self.naics_code], '' # This gives us easier access to the tags we defined as dependencies input_ = self.input() units = input_['units'] sections = input_['sections'] subsections = input_['subsections'] parent = input_.get('parent') cols['avg_wkly_wage'] = OBSColumn( # Make sure the column ID is unique within this module # If left blank, will be taken from this column's key in the output OrderedDict id=underscore_slugify(u'avg_wkly_wage_{}'.format(code)), # The PostgreSQL type of this column. Generally Numeric for numbers and Text # for categories. type='Numeric', # Human-readable name. Will be used as header in the catalog name=u'Average weekly wage for {} establishments'.format(name), # Human-readable description. Will be used as content in the catalog. description=u'Average weekly wage for a given quarter in the {name} industry (NAICS {code}).' u'{name} is {description}.'.format(name=name, code=code, description=description), # Ranking of importance, sometimes used to favor certain measures in auto-selection # Weight of 0 will hide this column from the user. We generally use between 0 and 10 weight=5, # How this measure was derived, for example "sum", "median", "average", etc. # In cases of "sum", this means functions downstream can construct estimates # for arbitrary geographies aggregate='average', # Tags are our way of noting aspects of this measure like its unit, the country # it's relevant to, and which section(s) of the catalog it should appear in. tags=[units['money'], sections['united_states'], subsections['income']], ) cols['qtrly_estabs'] = OBSColumn( id=underscore_slugify(u'qtrly_estabs_{}'.format(code)), type='Numeric', name=u'Establishments in {}'.format(name), description=u'Count of establishments in a given quarter in the {name} industry (NAICS {code}).' u'{name} is {description}.'.format(name=name, code=code, description=description), weight=5, aggregate='sum', tags=[units['businesses'], sections['united_states'], subsections['commerce_economy']], targets={parent['qtrly_estabs']: DENOMINATOR} if parent else {}, ) cols['month3_emplvl'] = OBSColumn( id=underscore_slugify(u'month3_emplvl_{}'.format(code)), type='Numeric', name=u'Employees in {} establishments'.format(name), description=u'Number of employees in the third month of a given quarter with the {name} ' u'industry (NAICS {code}). {name} is {description}.'.format( name=name, code=code, description=description), weight=5, aggregate='sum', tags=[units['people'], sections['united_states'], subsections['employment']], ) cols['lq_avg_wkly_wage'] = OBSColumn( id=underscore_slugify(u'lq_avg_wkly_wage_{}'.format(code)), type='Numeric', name=u'Average weekly wage location quotient for {} establishments'.format(name), description=u'Location quotient of the average weekly wage for a given quarter relative to ' u'the U.S. (Rounded to the hundredths place) within the {name} industry (NAICS {code}).' u'{name} is {description}.'.format(name=name, code=code, description=description), weight=3, aggregate=None, tags=[units['ratio'], sections['united_states'], subsections['income']], ) cols['lq_qtrly_estabs'] = OBSColumn( id=underscore_slugify(u'lq_qtrly_estabs_{}'.format(code)), type='Numeric', name=u'Location quotient of establishments in {}'.format(name), description=u'Location quotient of the quarterly establishment count relative to ' u'the U.S. (Rounded to the hundredths place) within the {name} industry (NAICS {code}).' u'{name} is {description}.'.format(name=name, code=code, description=description), weight=3, aggregate=None, tags=[units['ratio'], sections['united_states'], subsections['commerce_economy']], ) cols['lq_month3_emplvl'] = OBSColumn( id=underscore_slugify(u'lq_month3_emplvl_{}'.format(code)), type='Numeric', name=u'Employment level location quotient in {} establishments'.format(name), description=u'Location quotient of the employment level for the third month of a given quarter ' u'relative to the U.S. (Rounded to the hundredths place) within the {name} ' u'industry (NAICS {code}). {name} is {description}.'.format( name=name, code=code, description=description), weight=3, aggregate=None, tags=[units['ratio'], sections['united_states'], subsections['employment']], ) return cols We should never run metadata tasks on their own -- they should be defined as requirements by ``TableTask``, below -- but it is possible to do so, as an example. NAICS code '1025' is the supersector for eduction & health. .. code:: python education_health_columns = QCEWColumns(naics_code='1025') runtask(education_health_columns) education_health_columns.complete() .. parsed-literal:: True Output from a ``ColumnsTask`` is an ``OrderedDict`` with the columns wrapped in ``ColumnTarget``\ s, which allow us to pass them around without immediately committing them to the database. .. code:: python education_health_columns.output() .. parsed-literal:: OrderedDict([('avg_wkly_wage', ), ('qtrly_estabs', ), ('month3_emplvl', ), ('lq_avg_wkly_wage', ), ('lq_qtrly_estabs', ), ('lq_month3_emplvl', )]) We can check the ``OBSColumn`` table for evidence that our metadata has been committed to disk, since we ran the task. .. code:: python [(col.id, col.name) for col in session.query(OBSColumn)[:5]] .. parsed-literal:: [(u'tmp.avg_wkly_wage_10', u'Average weekly wage for Total, all industries establishments'), (u'tmp.qtrly_estabs_10', u'Establishments in Total, all industries'), (u'tmp.month3_emplvl_10', u'Employees in Total, all industries establishments'), (u'tmp.lq_avg_wkly_wage_10', u'Average weekly wage location quotient for Total, all industries establishments'), (u'tmp.lq_qtrly_estabs_10', u'Location quotient of establishments in Total, all industries')] 6. Populate output table ------------------------ Now that we have our data in a format similar to what we'll need, and our metadata lined up, we can tie it together with a ``TableTask``. Under the hood, ``TableTask`` handles the relational lifting between columns and actual data, and assigns a hash number to the dataset. Several methods must be overriden for ``TableTask`` to work: - ``version()``: a version control number, which is useful for forcing a re-run/overwrite without having to track down and delete output artifacts. - ``table_timespan()``: the timespan (for example, '2014', or '2012Q4') that identifies the date range or point-in-time for this table. - ``columns()``: an OrderedDict of (colname, ColumnTarget) pairs. This should be constructed by pulling the desired columns from required ``ColumnsTask`` classes. - ``populate()``: a method that should populate (most often via) INSERT the output table. .. code:: python # Since we have a column ('area_fips') that is a shared reference to # geometries ('geom_ref') we have to import that column. from tasks.us.census.tiger import GeoidColumns class QCEW(TableTask): year = IntParameter() qtr = IntParameter() def version(self): return 1 def requires(self): requirements = { 'data': SimpleQCEW(year=self.year, qtr=self.qtr), 'geoid_cols': GeoidColumns(), 'naics': OrderedDict() } for naics_code, naics_name in NAICS_CODES.iteritems(): # Only include the more general NAICS codes if is_supersector(naics_code) or is_sector(naics_code) or naics_code == '10': requirements['naics'][naics_code] = QCEWColumns(naics_code=naics_code) return requirements def table_timespan(self): return get_timespan('{year}Q{qtr}'.format(year=self.year, qtr=self.qtr)) def columns(self): # Here we assemble an OrderedDict using our requirements to specify the # columns that go into this table. # The column name input_ = self.input() cols = OrderedDict([ ('area_fips', input_['geoid_cols']['county_geoid']) ]) for naics_code, naics_cols in input_['naics'].iteritems(): for key, coltarget in naics_cols.iteritems(): naics_name = NAICS_CODES[naics_code] colname = underscore_slugify(u'{}_{}_{}'.format( key, naics_code, naics_name)) cols[colname] = coltarget return cols def populate(self): # This select statement transforms the input table, taking advantage of our # new column names. # The session is automatically committed if there are no errors. session = current_session() columns = self.columns() colnames = columns.keys() select_colnames = [] for naics_code, naics_columns in self.input()['naics'].iteritems(): for colname, coltarget in naics_columns.iteritems(): select_colnames.append('''MAX(CASE WHEN industry_code = '{naics_code}' THEN {colname} ELSE NULL END)::Numeric'''.format(naics_code=naics_code, colname=colname )) insert = '''INSERT INTO {output} ({colnames}) SELECT area_fips, {select_colnames} FROM {input} GROUP BY area_fips '''.format( output=self.output().table, input=self.input()['data'].table, colnames=', '.join(colnames), select_colnames=', '.join(select_colnames), ) session.execute(insert) On a fresh database, this should return False Will not run if it has been run before for this year & quarter combination. .. code:: python table_task = QCEW(year=2014, qtr=4) runtask(table_task) table_task.complete() .. parsed-literal:: True The table should exist in metadata, as well as in data, with all relations well-defined. Unlike the ``TempTableTask``\ s above, the output of a ``TableTask`` is a postgrse table in the ``observatory`` schema, with a unique hash name. .. code:: python table = table_task.output() table.table .. parsed-literal:: 'observatory.obs_3dc49b70f71ed9bbf5b4a48773c860519af70e1e' It's possible for us to peek at the output data. .. code:: python session.execute('SELECT * FROM {} LIMIT 1'.format(table.table)).fetchall() .. parsed-literal:: [(u'01001', None, None, None, None, None, None, Decimal('395'), Decimal('5'), Decimal('144'), Decimal('0.65'), Decimal('0.52'), Decimal('0.68'), Decimal('609'), Decimal('80'), Decimal('1024'), Decimal('0.96'), Decimal('0.66'), Decimal('0.74'), Decimal('364'), Decimal('68'), Decimal('368'), Decimal('0.79'), Decimal('0.95'), Decimal('1.13'), Decimal('917'), Decimal('3'), Decimal('66'), Decimal('0.68'), Decimal('0.94'), Decimal('1.00'), Decimal('2317'), Decimal('5'), Decimal('103'), Decimal('1.89'), Decimal('3.26'), Decimal('2.45'), Decimal('914'), Decimal('77'), Decimal('426'), Decimal('1.17'), Decimal('1.16'), Decimal('0.90'), Decimal('1231'), Decimal('33'), Decimal('157'), Decimal('1.26'), Decimal('0.60'), Decimal('0.35'), Decimal('925'), Decimal('20'), Decimal('198'), Decimal('1.14'), Decimal('1.66'), Decimal('1.30'), Decimal('914'), Decimal('77'), Decimal('426'), Decimal('1.17'), Decimal('1.16'), Decimal('0.90'), Decimal('1225'), Decimal('30'), Decimal('1347'), Decimal('1.45'), Decimal('1.01'), Decimal('1.44'), Decimal('584'), Decimal('85'), Decimal('1168'), Decimal('0.93'), Decimal('0.65'), Decimal('0.73'), Decimal('904'), Decimal('91'), Decimal('380'), Decimal('0.98'), Decimal('0.61'), Decimal('0.25'), None, None, None, None, None, None, Decimal('433'), Decimal('149'), Decimal('1935'), Decimal('1.13'), Decimal('1.63'), Decimal('1.57'), Decimal('1225'), Decimal('30'), Decimal('1347'), Decimal('1.45'), Decimal('1.01'), Decimal('1.44'), Decimal('274'), Decimal('66'), Decimal('1432'), Decimal('1.10'), Decimal('1.13'), Decimal('1.50'), Decimal('301'), Decimal('8'), Decimal('66'), Decimal('0.53'), Decimal('0.68'), Decimal('0.44'), Decimal('620'), Decimal('15'), Decimal('127'), Decimal('0.97'), Decimal('0.73'), Decimal('0.36'), None, None, None, None, None, None, Decimal('929'), Decimal('17'), Decimal('132'), Decimal('2.13'), Decimal('1.91'), Decimal('1.53'), Decimal('677'), Decimal('768'), Decimal('8173'), Decimal('0.97'), Decimal('0.95'), Decimal('0.91'), Decimal('364'), Decimal('68'), Decimal('368'), Decimal('0.79'), Decimal('0.95'), Decimal('1.13'), Decimal('275'), Decimal('74'), Decimal('1498'), Decimal('0.94'), Decimal('1.06'), Decimal('1.35'), Decimal('584'), Decimal('202'), Decimal('2322'), Decimal('1.01'), Decimal('1.20'), Decimal('1.12'), Decimal('781'), Decimal('114'), Decimal('430'), Decimal('0.70'), Decimal('1.55'), Decimal('0.72'), Decimal('1201'), Decimal('7'), Decimal('36'), Decimal('1.02'), Decimal('0.52'), Decimal('0.17'), Decimal('0'), Decimal('3'), Decimal('0'), Decimal('0'), Decimal('0.57'), Decimal('0'), Decimal('860'), Decimal('62'), Decimal('190'), Decimal('0.69'), Decimal('0.62'), Decimal('0.29'), Decimal('0'), Decimal('26'), Decimal('0'), Decimal('0'), Decimal('0.59'), Decimal('0'), Decimal('1201'), Decimal('7'), Decimal('36'), Decimal('1.02'), Decimal('0.52'), Decimal('0.17'), None, None, None, None, None, None, Decimal('628'), Decimal('41'), Decimal('122'), Decimal('0.87'), Decimal('1.28'), Decimal('0.77'), Decimal('842'), Decimal('73'), Decimal('308'), Decimal('0.67'), Decimal('1.75'), Decimal('0.71'))]