...
 
Commits (5)
Changes
=======
v1.2 - july 7, 2018
-------------------
See closed issues in Milestone 1.2: https://github.com/geopython/stetl/milestone/8?closed=1
Most important changes are related to deployment in Docker and Kubernetes environments, dealing
with (env) variables, Stetl arguments and logging, for example:
- issue #71: Allow Environment vars to substitute/override config template arg-variables
- issue #72: Allow multiple -a args for Stetl main prog. Allowing multiple -a arguments allows
for more simpler overriding of for example default options.
- #68 Stetl should not output passwords and other particular data in its log
v1.1.1 - november 7, 2017
-------------------------
......
Metadata-Version: 1.1
Metadata-Version: 1.2
Name: Stetl
Version: 1.1
Summary: Stetl provides transformation for spatial data
Version: 1.2
Summary: Transformation and conversion framework (ETL) mainly for geospatial data
Home-page: http://github.com/geopython/stetl
Author: Just van den Broecke
Author-email: justb4@gmail.com
Maintainer: Just van den Broecke
Maintainer-email: justb4@gmail.com
License: GNU GPL v3
Description: # Stetl - Streaming ETL
......@@ -12,7 +14,7 @@ Description: # Stetl - Streaming ETL
[![Build Status](https://travis-ci.org/geopython/stetl.png)](https://travis-ci.org/geopython/stetl)
[![Documentation Status](https://img.shields.io/badge/docs-latest-brightgreen.svg)](http://stetl.readthedocs.org/en/latest)
[![Gitter Chat](http://img.shields.io/badge/chat-online-brightgreen.svg)](https://gitter.im/justb4/stetl)
[![Gitter Chat](http://img.shields.io/badge/chat-online-brightgreen.svg)](https://gitter.im/geopython/stetl)
Notice: the Stetl GH repo is now at the [GeoPython GH organization](https://github.com/geopython).
......@@ -54,7 +56,7 @@ Description: # Stetl - Streaming ETL
Most of the data conversions within the [Dutch NLExtract Project](https://github.com/nlextract/NLExtract) apply Stetl.
Stetl also proved to be very effective in [IoT-related transformations involving the SensorWeb/SOS](https://github.com/Geonovum/smartemission).
Stetl also proved to be very effective in [IoT-related transformations involving the SensorWeb/SOS](https://github.com/smartemission).
## Examples
......@@ -63,7 +65,7 @@ Description: # Stetl - Streaming ETL
## Installation
Stetl can be installed via PyPi `pip install stetl` and recently as a [Stetl Docker image](https://hub.docker.com/r/justb4/stetl).
Stetl can be installed via PyPi `pip install stetl` and recently as a [Stetl Docker image](https://hub.docker.com/r/geopython/stetl).
More on [installation in the documentation](http://www.stetl.org/en/latest/install.html).
## Contributing
......@@ -80,7 +82,7 @@ Description: # Stetl - Streaming ETL
Stetl originated in the INSPIRE-FOSS project: [2009-2013 now archived](https://github.com/justb4/inspire-foss).
Since then Stetl evolved into a wider use like
transforming [Dutch GML-based Open Datasets](https://github.com/nlextract/NLExtract) such as IMGEO/BGT (Large Scale Topography)
and IMKAD/BRK (Cadastral Data).
and IMKAD/BRK (Cadastral Data) and [Sensor Data Transformation and Calibration](https://github.com/smartemission/docker-se-stetl).
## Finally
......@@ -96,6 +98,19 @@ Description: # Stetl - Streaming ETL
Changes
=======
v1.2 - july 7, 2018
-------------------
See closed issues in Milestone 1.2: https://github.com/geopython/stetl/milestone/8?closed=1
Most important changes are related to deployment in Docker and Kubernetes environments, dealing
with (env) variables, Stetl arguments and logging, for example:
- issue #71: Allow Environment vars to substitute/override config template arg-variables
- issue #72: Allow multiple -a args for Stetl main prog. Allowing multiple -a arguments allows
for more simpler overriding of for example default options.
- #68 Stetl should not output passwords and other particular data in its log
v1.1.1 - november 7, 2017
-------------------------
......@@ -206,9 +221,9 @@ Description: # Stetl - Streaming ETL
Thanks to Tom Kralidis for helping out to move from personal repo to https://github.com/geopython organization.
Keywords: etl xsl gdal gis vector feature data
Keywords: etl xsl gdal gis vector feature data gml xml
Platform: UNKNOWN
Classifier: Development Status :: 4 - Beta
Classifier: Development Status :: 5 - Production/Stable
Classifier: Environment :: Console
Classifier: Intended Audience :: Developers
Classifier: Intended Audience :: Science/Research
......
......@@ -4,7 +4,7 @@ Stetl, streaming ETL, pronounced "staedl", is a lightweight ETL-framework for ge
[![Build Status](https://travis-ci.org/geopython/stetl.png)](https://travis-ci.org/geopython/stetl)
[![Documentation Status](https://img.shields.io/badge/docs-latest-brightgreen.svg)](http://stetl.readthedocs.org/en/latest)
[![Gitter Chat](http://img.shields.io/badge/chat-online-brightgreen.svg)](https://gitter.im/justb4/stetl)
[![Gitter Chat](http://img.shields.io/badge/chat-online-brightgreen.svg)](https://gitter.im/geopython/stetl)
Notice: the Stetl GH repo is now at the [GeoPython GH organization](https://github.com/geopython).
......@@ -46,7 +46,7 @@ of GML/XML-based National geo-datasets to for example PostGIS.
Most of the data conversions within the [Dutch NLExtract Project](https://github.com/nlextract/NLExtract) apply Stetl.
Stetl also proved to be very effective in [IoT-related transformations involving the SensorWeb/SOS](https://github.com/Geonovum/smartemission).
Stetl also proved to be very effective in [IoT-related transformations involving the SensorWeb/SOS](https://github.com/smartemission).
## Examples
......@@ -55,7 +55,7 @@ Best is to start with the [basic examples](examples/basics)
## Installation
Stetl can be installed via PyPi `pip install stetl` and recently as a [Stetl Docker image](https://hub.docker.com/r/justb4/stetl).
Stetl can be installed via PyPi `pip install stetl` and recently as a [Stetl Docker image](https://hub.docker.com/r/geopython/stetl).
More on [installation in the documentation](http://www.stetl.org/en/latest/install.html).
## Contributing
......@@ -72,7 +72,7 @@ review the [guidelines for contributing](CONTRIBUTING.md).
Stetl originated in the INSPIRE-FOSS project: [2009-2013 now archived](https://github.com/justb4/inspire-foss).
Since then Stetl evolved into a wider use like
transforming [Dutch GML-based Open Datasets](https://github.com/nlextract/NLExtract) such as IMGEO/BGT (Large Scale Topography)
and IMKAD/BRK (Cadastral Data).
and IMKAD/BRK (Cadastral Data) and [Sensor Data Transformation and Calibration](https://github.com/smartemission/docker-se-stetl).
## Finally
......
1.1
\ No newline at end of file
1.2
\ No newline at end of file
......@@ -8,6 +8,7 @@
from stetl.main import parse_args
from stetl.etl import ETL
from stetl.util import Util
import sys
log = Util.get_log('main')
......@@ -18,12 +19,12 @@ def main():
Args:
-c --config <config_file> the Stetl config file.
-s --section <section_name> the section in the Stetl config (ini) file to execute (default is [etl]).
-a --args <arglist> substitutable args for symbolic, {arg}, values in Stetl config file, in format "arg1=foo arg2=bar" etc.
-a --args <arglist> zero or more substitutable args for symbolic, {arg}, values in Stetl config file, in format -a arg1=foo -a arg2=bar etc.
-h --help <subject> Get component documentation like its configuration parameters, e.g. stetl doc stetl.inputs.fileinput.FileInput
"""
args = parse_args()
args = parse_args(sys.argv[1:])
if args.config_file:
# Do the ETL
......
python-stetl (1.1+ds-3) UNRELEASED; urgency=medium
python-stetl (1.2+ds-1) unstable; urgency=medium
* New upstream release.
* Update copyright-format URL to use HTTPS.
* Fix deprecated source override location.
* Update Vcs-* URLs for Salsa.
......@@ -7,8 +8,9 @@ python-stetl (1.1+ds-3) UNRELEASED; urgency=medium
* Add module import tests to autopkgtest configuration.
* Add override for dependency-on-python-version-marked-for-end-of-life.
* Strip trailing whitespace from control & rules files.
* Drop reproducible_build.patch, applied upstream.
-- Bas Couwenberg <sebastic@debian.org> Sun, 21 Jan 2018 10:42:54 +0100
-- Bas Couwenberg <sebastic@debian.org> Sat, 07 Jul 2018 19:30:07 +0200
python-stetl (1.1+ds-2) unstable; urgency=medium
......
Description: Make the build reproducible
Author: Chris Lamb <lamby@debian.org>
Last-Update: 2017-11-08
Bug-Debian: https://bugs.debian.org/881217
Forwarded: https://github.com/geopython/stetl/pull/64
--- a/stetl/filters/templatingfilter.py
+++ b/stetl/filters/templatingfilter.py
@@ -139,7 +139,7 @@ class Jinja2TemplatingFilter(TemplatingF
# Applying Decorator pattern with the Config class to provide
# read-only config values from the configured properties.
- @Config(ptype=str, default=[os.getcwd()], required=False)
+ @Config(ptype=str, default=None, required=False)
def template_search_paths(self):
"""
List of directories where to search for templates, default is current working directory only.
@@ -207,7 +207,7 @@ class Jinja2TemplatingFilter(TemplatingF
raise e
# Load and Init Template once
- loader = FileSystemLoader(self.template_search_paths)
+ loader = FileSystemLoader(self.template_search_paths or [os.getcwd()])
self.jinja2_env = Environment(loader=loader, extensions=['jinja2.ext.do'], lstrip_blocks=True, trim_blocks=True)
# Register additional Filters on the template environment by updating the filters dict:
reproducible_build.patch
......@@ -111,6 +111,10 @@ Components: Filters
:members:
:show-inheritance:
.. automodule:: stetl.filters.sieve
:members:
:show-inheritance:
.. automodule:: stetl.filters.stringfilter
:members:
:show-inheritance:
......@@ -131,6 +135,30 @@ Components: Filters
:members:
:show-inheritance:
.. automodule:: stetl.filters.execfilter
:members:
:show-inheritance:
.. automodule:: stetl.filters.nullfilter
:members:
:show-inheritance:
.. automodule:: stetl.filters.packetbuffer
:members:
:show-inheritance:
.. automodule:: stetl.filters.packetwriter
:members:
:show-inheritance:
.. automodule:: stetl.filters.regexfilter
:members:
:show-inheritance:
.. automodule:: stetl.filters.zipfileextractor
:members:
:show-inheritance:
Components: Outputs
-------------------
......
......@@ -52,9 +52,9 @@ copyright = u'2013+, Just van den Broecke'
# built documents.
#
# The short X.Y version.
version = '1.1'
version = '1.2-dev'
# The full version, including alpha/beta/rc tags.
release = '1.1'
release = '1.2-dev'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
......
......@@ -9,5 +9,4 @@ All development is done via GitHub: see https://github.com/geopython/stetl.
Contact the main author Just van den Broecke via email at just@justobjects.nl.
Online chat via Gitter: https://gitter.im/geopython/stetl
......@@ -234,9 +234,15 @@ or even OGC WPS servers (planned).
Reusable Stetl Configs
----------------------
What we saw in the last example is that it is hard to reuse this `etl.cfg` when we have for example a different input file
or want to map to different output files. For this Stetl supports `parameter substitution`. Here command line parameters are substituted
for variables in `etl.cfg`. A variable is declared between curly brackets like `{out_xml}`. See
What we saw in the last example is that it is hard to reuse this `etl.cfg`
when we have for example a different input file
or want to map to different output files.
For this Stetl supports `config parameter substitution`.
Dynamic or secret (e.g. database credentials) parameters in `etl.cfg` are declared
symbolically and substituted at runtime via the commandline or the OS environment.
A variable is declared between curly brackets like `{out_xml}`. See
example `6_cmdargs <https://github.com/geopython/stetl/tree/master/examples/basics/6_cmdargs>`_. ::
[etl]
......@@ -254,12 +260,13 @@ example `6_cmdargs <https://github.com/geopython/stetl/tree/master/examples/basi
class = outputs.fileoutput.FileOutput
file_path = {out_xml}
Note the symbolic input, xsl and output files. We can now perform this ETL using the `stetl -a option` in two ways.
Note the symbolic input, xsl and output files. We can now perform
the ETL using the `stetl -a option` in two basic ways.
One, passing the arguments on the commandline, like ::
stetl -c etl.cfg -a "in_xml=input/cities.xml in_xsl=cities2gml.xsl out_xml=output/gmlcities.gml"
Two, passing the arguments in a properties file, here called `etl.args` (the name of the suffix .args is not significant). ::
Two, passing the arguments in a properties file, here called `etl.args` (the name of the suffix .args is not significant, could be .env as well). ::
stetl -c etl.cfg -a etl.args
......@@ -270,7 +277,41 @@ Where the content of the `etl.args` properties file is: ::
in_xsl=cities2gml.xsl
out_xml=output/gmlcities.gml
This makes an ETL chain highly reusable. A very elaborate Stetl config with parameter substitution can be seen in the
It is also possible to specify **multiple -a arguments**. This provides for situations
where a `default.args` contains all default arguments and a `my.args` or explicit `-a` settings
that override the default values in `default.args`. Overriding is determined by the order of
the `-a` arguments. Examples: ::
stetl -c etl.cfg -a default.args -a my.args
stetl -c etl.cfg -a default.args -a "db_user=docker db_password=pass"
stetl -c etl.cfg -a default.args -a db_user=docker -a db_password=pass
It is also possible to pass these key/value pairs via OS Environment variables.
This is especially handy in Docker-based deployments like Docker Compose and Kubernetes.
In this case the variable names need to be prepended with `STETL_` or `stetl_` as
to not mix-up with other non-related OS-env vars. A mixture of commandline args (file)
and environment vars is possible. The rule is that
*OS Environment variables always override/overrule arguments specified with -a option(s)*.
For example, the above args could also be passed as follows: ::
export stetl_in_xml="input/cities.xml"
export stetl_in_xsl="cities2gml.xsl"
export stetl_out_xml="output/gmlcities.gml"
stetl -c etl.cfg
or only override the input file name `in_xml` from `etl.args`: ::
export stetl_in_xml="input/cities2.xml"
stetl -c etl.cfg -a etl.args
or even with multiple `-a args`: ::
export stetl_in_xml="input/cities2.xml"
stetl -c etl.cfg -a etl.args -a my.args
This makes an ETL chain highly reusable.
A very elaborate Stetl config with parameter substitution can be seen in the
`Top10NL ETL <https://github.com/geopython/stetl/blob/master/examples/top10nl/etl-top10nl.cfg>`_.
Connection Compatibility
......
# Trivial example Sieve filter.
# The input data is in input/cities.csv.
# We sieve out (passthrough) all records where city attr value
# matches "amsterdam" or "otterlo".
[etl]
chains = input_csv|attr_value_sieve|output_std,
input_csv|attr_value_sieve|output_file
[input_csv]
class = inputs.fileinput.CsvFileInput
file_path = input/cities.csv
output_format = record_array
[attr_value_sieve]
class = filters.sieve.AttrValueRecordSieve
input_format = record_array
output_format = record_array
attr_name = city
attr_values = amsterdam,otterlo
[output_std]
class = outputs.standardoutput.StandardOutput
[output_file]
class = outputs.fileoutput.FileOutput
file_path = output/cities_sieved.txt
#!/bin/sh
#
# ETL for copying a file to standard output.
#
# Shortcut to call Stetl main.py with etl config.
#
stetl -c etl.cfg
city,lat,lon
amsterdam,52.4,4.9
otterlo,52.101,5.773
rotterdam,51.9,4.5
eindhoven,51.44,5.47
[{'lat': '52.4', 'city': 'amsterdam', 'lon': '4.9'}, {'lat': '52.101', 'city': 'otterlo', 'lon': '5.773'}]
\ No newline at end of file
......@@ -4,6 +4,10 @@
#
# Author: Just van den Broecke
#
stetl -c etl.cfg
stetl=stetl
PYTHONPATH=${PYTHONPATH}:../../..
# stetl=../../../stetl/main.py
$stetl -c etl.cfg
......@@ -14,3 +14,6 @@ stetl -c etl.cfg -a "in_xml=input/cities.xml in_xsl=cities2gml.xsl out_xml=outp
# Option 2: using a properties file
stetl -c etl.cfg -a etl.args
# Option 3: multiple -a options e.g. overriding one or more default args (file)
stetl -c etl.cfg -a etl.args -a "in_xml=input/amsterdam.xml"
<?xml version='1.0' encoding='utf-8'?>
<cities>
<city>
<name>Amsterdam</name>
<lat>52.4</lat>
<lon>4.9</lon>
</city>
</cities>
......@@ -22,24 +22,4 @@
</ogr:geometry>
</ogr:City>
</gml:featureMember>
<gml:featureMember>
<ogr:City>
<ogr:name>Bonn</ogr:name>
<ogr:geometry>
<gml:Point srsName="urn:ogc:def:crs:EPSG:4326">
<gml:coordinates>50.7,7.1</gml:coordinates>
</gml:Point>
</ogr:geometry>
</ogr:City>
</gml:featureMember>
<gml:featureMember>
<ogr:City>
<ogr:name>Rome</ogr:name>
<ogr:geometry>
<gml:Point srsName="urn:ogc:def:crs:EPSG:4326">
<gml:coordinates>41.9,12.5</gml:coordinates>
</gml:Point>
</ogr:geometry>
</ogr:City>
</gml:featureMember>
</ogr:FeatureCollection>
This source diff could not be displayed because it is too large. You can view the blob instead.
......@@ -6,5 +6,4 @@ verbosity = 3
[egg_info]
tag_build =
tag_date = 0
tag_svn_revision = 0
......@@ -38,9 +38,9 @@ with open('requirements-main.txt') as f:
setup(
name='Stetl',
version=version,
description="Stetl provides transformation for spatial data",
description="Transformation and conversion framework (ETL) mainly for geospatial data",
license='GNU GPL v3',
keywords='etl xsl gdal gis vector feature data',
keywords='etl xsl gdal gis vector feature data gml xml',
author='Just van den Broecke',
author_email='justb4@gmail.com',
maintainer='Just van den Broecke',
......@@ -56,7 +56,7 @@ setup(
tests_require=['nose'],
test_suite='nose.collector',
classifiers=[
'Development Status :: 4 - Beta',
'Development Status :: 5 - Production/Stable',
'Environment :: Console',
'Intended Audience :: Developers',
'Intended Audience :: Science/Research',
......
......@@ -5,6 +5,8 @@
# Author: Just van den Broecke
#
import os
import sys
from time import time
from util import Util, ConfigSection
from packet import FORMAT
......@@ -122,6 +124,10 @@ class Component(object):
self.cfg_vals = dict()
self.next = None
self.section = section
self._max_time = -1
self._min_time = sys.maxint
self._total_time = 0
self._invoke_count = 0
# First assume single output provided by derived class
self._output_format = produces
......@@ -184,10 +190,14 @@ class Component(object):
# Current processor of packet
packet.component = self
start_time = self.timer_start()
self._invoke_count += 1
# Do something with the data
result = self.before_invoke(packet)
if result is False:
# Component indicates it does not want the chain to proceed
self.timer_stop(start_time)
return packet
# Do component-specific processing, e.g. read or write or filter
......@@ -196,8 +206,11 @@ class Component(object):
result = self.after_invoke(packet)
if result is False:
# Component indicates it does not want the chain to proceed
self.timer_stop(start_time)
return packet
self.timer_stop(start_time)
# If there is a next component, let it process
if self.next:
# Hand-over data (line, doc whatever) to the next component
......@@ -219,6 +232,17 @@ class Component(object):
# Notify all comps that we exit
self.exit()
# Simple performance stats in one line (issue #77)
# Calc average processing time, watch for 0 invoke-case
avg_time = 0.0
if self._invoke_count > 0:
avg_time = self._total_time / self._invoke_count
log.info("%s invokes=%d time(total, min, max, avg) = %.3f %.3f %.3f %.3f" %
(self.__class__.__name__, self._invoke_count,
self._total_time, self._min_time, self._max_time,
avg_time))
# If there is a next component, let it do its exit()
if self.next:
self.next.do_exit()
......@@ -258,3 +282,23 @@ class Component(object):
Allows derived Components to perform a one-time exit/cleanup.
"""
pass
def timer_start(self):
return time()
def timer_stop(self, start_time):
"""
Collect and calculate per-Component performance timing stats.
:param start_time:
:return:
"""
delta_time = time() - start_time
# Calc timing stats for Component invocation
self._total_time += delta_time
if delta_time > self._max_time:
self._max_time = delta_time
if delta_time < self._min_time and '%.3f' % delta_time != '0.000':
self._min_time = delta_time
......@@ -5,6 +5,7 @@
# Author: Just van den Broecke
#
import os
import re
import sys
from ConfigParser import ConfigParser
import version
......@@ -50,29 +51,86 @@ class ETL:
sys.path.append(ETL.CONFIG_DIR)
config_str = ''
try:
# Get config file as string
log.info("Reading config_file = %s" % config_file)
f = open(config_file, 'r')
config_str = f.read()
f.close()
except Exception as e:
log.error("Cannot read config file: err=%s" % str(e))
raise e
args_names = list()
try:
# Optional: expand symbolic arguments from args_dict and or OS Env
# ignore errors here as { .. } may appear at random.
# Parse unique list of argument names from config file string.
# https://www.machinelearningplus.com/python/python-regex-tutorial-examples/
args_names = list(set(re.findall('{[A-Z|a-z]\w+}', config_str)))
args_names = [name.split('{')[1].split('}')[0] for name in args_names]
# Optional: expand from equivalent env vars
args_dict = self.env_expand_args_dict(args_dict, args_names)
# In general all arg names should be present in args dict
for args_name in args_names:
if args_name not in args_dict:
log.warn("Arg not found in args nor environment: name=%s" % args_name)
# raise Exception("name=%s" % args_name)
except Exception as e:
log.warn("Expanding config arguments (non fatal yet): %s" % str(e))
try:
if args_dict:
log.info("Substituting %d args in config file from args_dict: %s" % (len(args_dict), str(args_dict)))
# Get config file as string
f = open(config_file, 'r')
config_str = f.read()
f.close()
log.info("Substituting %d args in config file from args_dict: %s" % (len(args_names), str(args_names)))
# Do replacements see http://docs.python.org/2/library/string.html#formatstrings
# and render substituted config string
config_str = config_str.format(**args_dict)
log.info("Substituting args OK")
# Put Config string into buffer (readfp() needs a readline() method)
config_buf = StringIO.StringIO(config_str)
# Parse config from file buffer
self.configdict.readfp(config_buf, config_file)
else:
# Parse config file directly
self.configdict.read(config_file)
except Exception as e:
log.error("Error substituting config arguments: err=%s" % str(e))
raise e
try:
# Put Config string into buffer (readfp() needs a readline() method)
config_buf = StringIO.StringIO(config_str)
# Parse config from file buffer
self.configdict.readfp(config_buf, config_file)
except Exception as e:
log.error("Fatal Error reading config file: err=%s" % str(e))
log.error("Error populating config dict from config string: err=%s" % str(e))
raise e
def env_expand_args_dict(self, args_dict, args_names):
"""
Expand values in dict with equivalent values from the
OS Env. NB vars in OS Env should be prefixed with `STETL_` or `stetl_`
as to get overrides by accident.
:return: expanded args_dict or None
"""
env_dict = os.environ
for name in env_dict:
args_key = '_'.join(name.split('_')[1:])
if name.lower().startswith('stetl_') and args_key in args_names:
# Get real key, e.g. "STETL_HOST" becomes "HOST"
# "stetl_host" becomes "host".
args_value = env_dict[name]
if not args_dict:
args_dict = dict()
# Set: optionally override any existing value
args_dict[args_key] = args_value
log.info("Set/override from env var: %s" % name)
return args_dict
def run(self):
# The main ETL processing
......
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Executes the given command and returns the captured output.
#
# Author: Frank Steggink
#
import subprocess
import os
from stetl.component import Config
from stetl.filter import Filter
from stetl.util import Util
from stetl.packet import FORMAT
log = Util.get_log('execfilter')
class ExecFilter(Filter):
"""
Executes any command (abstract base class).
"""
@Config(ptype=str, default='', required=False)
def env_args(self):
"""
Provides of list of environment variables which will be used when executing the given command.
Example: env_args = pgpassword=postgres othersetting=value~with~spaces
"""
pass
@Config(ptype=str, default='=', required=False)
def env_separator(self):
"""
Provides the separator to split the environment variable names from their values.
"""
pass
def __init__(self, configdict, section, consumes, produces):
Filter.__init__(self, configdict, section, consumes, produces)
def invoke(self, packet):
return packet
def execute_cmd(self, cmd):
env_vars = Util.string_to_dict(self.env_args, self.env_separator)
old_environ = os.environ.copy()
try:
os.environ.update(env_vars)
log.info("executing cmd=%s" % cmd)
result = subprocess.check_output(cmd, shell=True)
log.info("execute done")
return result
finally:
os.environ = old_environ
class CommandExecFilter(ExecFilter):
"""
Executes an arbitrary command and captures the output
consumes=FORMAT.string, produces=FORMAT.string
"""
def __init__(self, configdict, section):
ExecFilter.__init__(self, configdict, section, consumes=FORMAT.string, produces=FORMAT.string)
def invoke(self, packet):
if packet.data is not None:
packet.data = self.execute_cmd(packet.data)
return packet
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Extracts data from a string using a regular expression and generates a record.
#
# Author: Frank Steggink
from stetl.component import Config
from stetl.filter import Filter
from stetl.packet import FORMAT
from stetl.util import Util
import re
log = Util.get_log("regexfilter")
class RegexFilter(Filter):
"""
Extracts data from a string using a regular expression and returns the named groups as a record.
consumes=FORMAT.string, produces=FORMAT.record
"""
# Start attribute config meta
# Applying Decorator pattern with the Config class to provide
# read-only config values from the configured properties.
@Config(ptype=str, default=None, required=True)
def pattern_string(self):
"""
Regex pattern string. Should contain named groups.
"""
pass
# End attribute config meta
# Constructor
def __init__(self, configdict, section, consumes=FORMAT.string, produces=FORMAT.record):
Filter.__init__(self, configdict, section, consumes, produces)
self.regex_object = re.compile(self.pattern_string, re.S)
def init(self):
log.info('Init: regex filter')
if self.pattern_string is None:
# If no pattern_string is present:
err_s = 'The pattern_string needs to be configured'
log.error(err_s)
raise ValueError('The pattern_string needs to be configured')
def exit(self):
log.info('Exit: regex filter')
def invoke(self, packet):
if packet.data is None:
return packet
m = self.regex_object.match(packet.data)
if m is not None:
packet.data = m.groupdict()
else:
packet.data = {}
return packet
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Lets data Packets pass-through, "sieve", based on criteria in their data.
# See issue: https://github.com/geopython/stetl/issues/78
#
# A concrete example is AttrValueRecordSieve which sieves records matching
# specific attribute values. One can also think of Sieves based on XPath expressions
# (e.g. for XML, GML), or geospatial, based on for example WFS-like filters like bounding boxes.
#
# Author: Just van den Broecke
#
from stetl.component import Config
from stetl.filter import Filter
from stetl.util import Util
from stetl.packet import FORMAT
log = Util.get_log('Sieve')
class Sieve(Filter):
"""
ABC for specific Sieves that pass-through, "sieve", Packets based on criteria in their data.
"""
def __init__(self, configdict, section, consumes, produces):
Filter.__init__(self, configdict, section, consumes, produces)
def invoke(self, packet):
if packet.data is None:
return packet
return self.sieve(packet)
def sieve(self, packet):
"""
To be implemented in subclasses.
:param packet:
:return:
"""
return packet
class AttrValueRecordSieve(Sieve):
"""
Sieves by attr/value(s) in Record Packets.
"""
@Config(ptype=str, required=True)
def attr_name(self):
"""
Name of attribute whose value(s) are to be sieved.
"""
pass
@Config(ptype=list, default=list(), required=False)
def attr_values(self):
"""
Value(s) for attribute to be to sieved. If empty any value is passed through (existence
of attr_name is criterium).
"""
pass
def __init__(self, configdict, section):
Sieve.__init__(self, configdict, section, consumes=[FORMAT.record_array, FORMAT.record], produces=[FORMAT.record_array, FORMAT.record])
def sieve(self, packet):
"""
Filter out Packets that are not matching designated attr value(s).
:param packet:
:return:
"""
# Start with empty result: fill with matching records
record_data = packet.data
packet.data = None
# Data can be list or single record
if type(record_data) is list:
packet.data = list()
for record in record_data:
if self.matches_attr(record):
packet.data.append(record)
elif type(record_data) is dict:
if self.matches_attr(record_data):
packet.data = record_data
return packet
def matches_attr(self, record):
# Attr not even in record: no use going on
if self.attr_name not in record:
return False
# Match if no value
if not self.attr_values:
return True
return record[self.attr_name] in self.attr_values
......@@ -4,6 +4,7 @@
#
# Author:Just van den Broecke
from stetl.component import Config
from stetl.util import Util
from stetl.filter import Filter
from stetl.packet import FORMAT
......@@ -38,17 +39,29 @@ class StringSubstitutionFilter(StringFilter):
consumes=FORMAT.string, produces=FORMAT.string
"""
@Config(ptype=str, default=None, required=True)
def format_args(self):
"""
Provides a list of format arguments used by the string substitution filter. Formatting of content according to Python String.format().
String should have substitutable values like {schema} {foo}.
Example: format_args = schema:test foo:bar
"""
pass
@Config(ptype=str, default=':', required=False)
def separator(self):
"""
Provides the separator to split the format argument names from their values.
"""
pass
# Constructor
def __init__(self, configdict, section):
StringFilter.__init__(self, configdict, section, consumes=FORMAT.string, produces=FORMAT.string)
# Formatting of content according to Python String.format()
# String should have substitutable values like {schema} {foo}
# format_args should be of the form format_args = schema:test foo:bar ...
self.format_args = self.cfg.get('format_args')
# Convert string to dict: http://stackoverflow.com/a/1248990
self.format_args_dict = Util.string_to_dict(self.format_args, ':')
self.format_args_dict = Util.string_to_dict(self.format_args, self.separator)
def filter_string(self, packet):
# String substitution based on Python String.format()
......
......@@ -94,6 +94,15 @@ class StringTemplatingFilter(TemplatingFilter):
consumes=FORMAT.record or FORMAT.record_array, produces=FORMAT.string
"""
@Config(ptype=bool, default=False, required=False)
def safe_substitution(self):
"""
Apply safe substitution? With this method, string.Template.safe_substitute will be invoked, instead of
string.Template.substitute. If placeholders are missing from mapping and keywords, instead of raising an
exception, the original placeholder will appear in the resulting string intact.
"""
pass
def __init__(self, configdict, section):
TemplatingFilter.__init__(self, configdict, section, consumes=[FORMAT.record, FORMAT.record_array])
......@@ -111,10 +120,16 @@ class StringTemplatingFilter(TemplatingFilter):
self.template = Template(self.template_string)
def render_template(self, packet):
if type(packet.data) is list:
packet.data = [self.template.substitute(item) for item in packet.data]
if self.safe_substitution:
if type(packet.data) is list:
packet.data = [self.template.safe_substitute(item) for item in packet.data]
else:
packet.data = self.template.safe_substitute(packet.data)
else:
packet.data = self.template.substitute(packet.data)
if type(packet.data) is list:
packet.data = [self.template.substitute(item) for item in packet.data]
else:
packet.data = self.template.substitute(packet.data)
return packet
......@@ -139,7 +154,7 @@ class Jinja2TemplatingFilter(TemplatingFilter):
# Applying Decorator pattern with the Config class to provide
# read-only config values from the configured properties.
@Config(ptype=str, default=[os.getcwd()], required=False)
@Config(ptype=str, default=None, required=False)
def template_search_paths(self):
"""
List of directories where to search for templates, default is current working directory only.
......@@ -207,7 +222,7 @@ class Jinja2TemplatingFilter(TemplatingFilter):
raise e
# Load and Init Template once
loader = FileSystemLoader(self.template_search_paths)
loader = FileSystemLoader(self.template_search_paths or [os.getcwd()])
self.jinja2_env = Environment(loader=loader, extensions=['jinja2.ext.do'], lstrip_blocks=True, trim_blocks=True)
# Register additional Filters on the template environment by updating the filters dict:
......
......@@ -112,9 +112,9 @@ class XmlElementReader(Filter):
if self.strip_namespaces:
packet.data = Util.stripNamespaces(elem).getroot()
# Clear the root element, since iterparse still builds a tree
# See http://effbot.org/zone/element-iterparse.htm
self.root.clear()
# Clear the element which has been read. Don't clear the root document,
# since the last element hasn't been processed yet.
elem.clear()
# If there is a next component, let it process
if self.next:
......
......@@ -29,6 +29,13 @@ class ZipFileExtractor(Filter):
"""
pass
@Config(ptype=bool, default=True, required=False)
def delete_file(self):
"""
Delete the file when the chain has been completed?
"""
pass
# End attribute config meta
# Constructor
......@@ -58,7 +65,7 @@ class ZipFileExtractor(Filter):
def after_chain_invoke(self, packet):
import os.path
if os.path.isfile(self.cur_file_path):
if os.path.isfile(self.cur_file_path) and self.delete_file:
os.remove(self.cur_file_path)
return True
......@@ -499,7 +499,12 @@ class ApacheLogFileInput(FileInput):
return packet
# Parse logfile line into record (dict)
packet.data = self.parser.parse(line)
packet.data = None
try:
packet.data = self.parser.parse(line)
except Exception as e:
log.warn("Cannot parse e=%s" % str(e))
return packet
......
......@@ -12,11 +12,12 @@ from version import __version__
import argparse # apt-get install python-argparse
import inspect
import os
import sys
log = Util.get_log('main')
def parse_args():
def parse_args(args_list):
log.info("Stetl version = %s" % __version__)
argparser = argparse.ArgumentParser(description='Invoke Stetl')
......@@ -27,22 +28,27 @@ def parse_args():
dest='config_section', required=False)
argparser.add_argument('-a ', '--args', type=str,
help='Arguments or .properties file to be substituted for {argN}s in config file, as -a "arg1=foo arg2=bar" or -a args.properties',
dest='config_args', required=False)
help='Arguments or .properties files to be substituted for symbolic {argN}s in Stetl config file,\
as -a "arg1=foo arg2=bar" and/or -a args.properties, multiple -a options are possible',
dest='config_args', required=False, action='append')
argparser.add_argument('-d ', '--doc', type=str,
help='Get component documentation like its configuration parameters, e.g. stetl doc stetl.inputs.fileinput.FileInput',
dest='doc_args', required=False)
args = argparser.parse_args()
args = argparser.parse_args(args_list)
if args.config_args:
if os.path.isfile(args.config_args):
log.info('Found args file at: %s' % args.config_args)
args.config_args = Util.propsfile_to_dict(args.config_args)
else:
# Convert string to dict: http://stackoverflow.com/a/1248990
args.config_args = Util.string_to_dict(args.config_args)
args_total = dict()
for arg in args.config_args:
if os.path.isfile(arg):
log.info('Found args file at: %s' % arg)
args_total = Util.merge_two_dicts(args_total, Util.propsfile_to_dict(arg))
else:
# Convert string to dict: http://stackoverflow.com/a/1248990
args_total = Util.merge_two_dicts(args_total, Util.string_to_dict(arg))
args.config_args = args_total
return args
......@@ -117,12 +123,14 @@ def main():
Args:
-c --config <config_file> the Stetl config file.
-s --section <section_name> the section in the Stetl config (ini) file to execute (default is [etl]).
-a --args <arglist> substitutable args for symbolic, {arg}, values in Stetl config file, in format "arg1=foo arg2=bar" etc.
-a --args <arglist> sero or more substitutable args for symbolic, {arg}, values in Stetl config file, in format -a arg1=foo -a arg2=bar etc.
-d --doc <class> Get component documentation like its configuration parameters, e.g. stetl --doc stetl.inputs.fileinput.FileInput
-h --help get help info
"""
args = parse_args()
# Pass arguments explicitly, facilitates testing
args = parse_args(sys.argv[1:])
if args.config_file:
# Do the ETL
......
......@@ -88,16 +88,17 @@ class PostgresDbOutput(DbOutput):
class PostgresInsertOutput(PostgresDbOutput):
"""
Output by inserting single record into Postgres database.
Input is a record (Python dic structure) or a Python list of dicts (records).
Output by inserting a single record in a Postgres database table.
Input is a Stetl record (Python dict structure) or a list of records.
Creates an INSERT for Postgres to insert each single record.
When the "replace" parameter is True, any existing record keyed by "key" is
attempted to be deleted first.
attempted to be UPDATEd first.
NB a constraint is that each record needs to contain all values as
an INSERT query is built once for the columns in the first record.
NB a constraint is that the first and each subsequent each record needs to contain
all values as an INSERT and UPDATE query template is built once for the columns
in the first record.
consumes=FORMAT.record
consumes=[FORMAT.record_array, FORMAT.record]
"""
# Start attribute config meta
......@@ -127,6 +128,7 @@ class PostgresInsertOutput(PostgresDbOutput):
def __init__(self, configdict, section, consumes=FORMAT.record):
DbOutput.__init__(self, configdict, section, consumes=[FORMAT.record_array, FORMAT.record])
self.query = None
self.update_query = None
self.db = None
def init(self):
......@@ -149,14 +151,31 @@ class PostgresInsertOutput(PostgresDbOutput):
log.info('query is %s', query)
return query
def create_update_query(self, record):
# We assume that all records do the same UPDATE key/values
# https://stackoverflow.com/questions/1109061/insert-on-duplicate-update-in-postgresql/6527838#6527838
# e.g. UPDATE table SET field='C', field2='Z' WHERE id=3;
query = "UPDATE %s SET (%s) = (%s) WHERE %s = %s" % (
self.cfg.get('table'), ",".join(['%s ' % k for k in record]), ",".join(["%s", ] * len(record.keys())), self.key, "%s")
log.info('update query is %s', query)
return query
def insert(self, record):
res = 0
if self.replace and self.key and self.key in record:
# Try to delete (replace option)
del_query = "DELETE FROM %s WHERE %s = '%s'" % (self.cfg.get('table'), self.key, record[self.key])
self.db.execute(del_query)
# Do insert with values from the record dict
self.db.execute(self.query, record.values())
# Replace option: try UPDATE if existing
# https://stackoverflow.com/questions/1109061/insert-on-duplicate-update-in-postgresql/6527838#6527838
values = record.values()
values.append(record[self.key])
res = self.db.execute(self.update_query, values)
# del_query = "DELETE FROM %s WHERE %s = '%s'" % (self.cfg.get('table'), self.key, record[self.key])
# res = self.db.execute(del_query)
if res < 1:
# Do insert with values from the record dict
# only if we did not do an UPDATE (res==0) on existing record.
self.db.execute(self.query, record.values())
self.db.commit(close=False)
def write(self, packet):
......@@ -174,10 +193,13 @@ class PostgresInsertOutput(PostgresDbOutput):
if type(record) is list and len(record) > 0:
first_record = record[0]
# Create query once
# Create INSERT and optional UPDATE query-templates once
if self.query is None:
self.query = self.create_query(first_record)
if self.replace and self.key and not self.update_query:
self.update_query = self.create_update_query(first_record)
# Check if record is single (dict) or array (list of dict)
if type(record) is dict:
# Do insert with values from the single record
......
......@@ -20,6 +20,22 @@ class ExecOutput(Output):
Executes any command (abstract base class).
"""
@Config(ptype=str, default='', required=False)
def env_args(self):
"""
Provides of list of environment variables which will be used when executing the given command.
Example: env_args = pgpassword=postgres othersetting=value~with~spaces
"""
pass
@Config(ptype=str, default='=', required=False)
def env_separator(self):
"""
Provides the separator to split the environment variable names from their values.
"""
pass
def __init__(self, configdict, section, consumes):
Output.__init__(self, configdict, section, consumes)
......@@ -27,13 +43,16 @@ class ExecOutput(Output):
return packet
def execute_cmd(self, cmd):
use_shell = True
if os.name == 'nt':
use_shell = False
env_vars = Util.string_to_dict(self.env_args, self.env_separator)
old_environ = os.environ.copy()
log.info("executing cmd=%s" % cmd)
subprocess.call(cmd, shell=use_shell)
log.info("execute done")
try:
os.environ.update(env_vars)
log.info("executing cmd=%s" % cmd)
subprocess.call(cmd, shell=True)
log.info("execute done")
finally:
os.environ = old_environ
class CommandExecOutput(ExecOutput):
......@@ -162,10 +181,10 @@ class Ogr2OgrExecOutput(ExecOutput):
# the same base name
# Copy the .gfs file if required, use the same base name
# so ogr2ogr will pick it up.
gfs_path = None
# Always assemble the GFS path, in case it is provided from outside.
file_ext = os.path.splitext(file_path)
gfs_path = file_ext[0] + '.gfs'
if self.gfs_template:
file_ext = os.path.splitext(file_path)
gfs_path = file_ext[0] + '.gfs'
shutil.copy(self.gfs_template, gfs_path)
# Append file name to command as last argument
......@@ -173,5 +192,5 @@ class Ogr2OgrExecOutput(ExecOutput):
if self.cleanup_input:
os.remove(file_path)
if gfs_path:
if gfs_path and os.path.exists(gfs_path):
os.remove(gfs_path)
......@@ -111,6 +111,16 @@ class Util:
# Convert/flatten dict to string http://codereview.stackexchange.com/questions/7953/how-do-i-flatten-a-dictionary-into-a-string
return ' '.join("%s=%r" % (key, val) for (key, val) in dict_v.iteritems())
@staticmethod
def merge_two_dicts(x, y):
"""
Given two dicts, merge them into a new dict as a shallow copy.
From: https://stackoverflow.com/questions/38987/how-to-merge-two-dictionaries-in-a-single-expression
"""
z = x.copy()
z.update(y)
return z
# Convert a properties file to a dict
@staticmethod
def propsfile_to_dict(file_path):
......@@ -475,4 +485,12 @@ class ConfigSection():
return result
def to_string(self):
return repr(self.config_dict)
# Need to hide some sensitive values, usually used for logging
safe_copy = self.config_dict.copy()
hides = ['passw', 'pasw', 'token', 'user']
for key in safe_copy:
for hide_key in hides:
if hide_key in key.lower():
safe_copy[key] = '<hidden>'
return repr(safe_copy)
......@@ -181,7 +181,7 @@ class parser:
self._pattern = '^' + ' '.join(subpatterns) + '$'
try:
self._regex = re.compile(self._pattern)
except Exception, e:
except Exception as e:
raise ApacheLogParserError(e)
def parse(self, line):
......@@ -191,58 +191,61 @@ class parser:
Raises and exception if it couldn't parse the line
"""
line = line.strip()
match = self._regex.match(line)
if match:
data = {}
for k, v in zip(self._names, match.groups()):
# JvdB convert to native Python types if needed
if self._options['use_native_types']:
if k in ['%>s', '%b', '%D']:
try:
v = int(v)
except Exception:
v = 0
elif k == '%t':
try:
v = int(parse_date(v)[0])
except Exception:
v = 0
elif v == '-':
v = None
# JvdB: elaborate request '%r' string
if k == '%r':
v_elms = v.split(' ')
# Filter out methods of no interest
if v_elms[0] not in self._options['methods']:
return None
if self._options['request_path_only']:
data = None
try:
line = line.strip()
match = self._regex.match(line)
if match:
data = {}
for k, v in zip(self._names, match.groups()):
# JvdB convert to native Python types if needed
if self._options['use_native_types']:
if k in ['%>s', '%b', '%D']:
try:
v = int(v)
except Exception:
v = 0
elif k == '%t':
try:
v = int(parse_date(v)[0])
except Exception:
v = 0
elif v == '-':
v = None
# JvdB: elaborate request '%r' string
if k == '%r':
v_elms = v.split(' ')
# Filter out methods of no interest
if v_elms[0] not in self._options['methods']:
return None
if self._options['request_path_only']:
try:
v = v.split(' ')[1]
except Exception:
v = ''
# JvdB map %-like keys to readable names using key map
if self._key_map:
try:
v = v.split(' ')[1]
except Exception:
v = ''
# JvdB map %-like keys to readable names using key map
if self._key_map:
try:
data[self._key_map[k]] = v
except KeyError:
pass
else:
data[k] = v
data[self._key_map[k]] = v
except KeyError:
pass
else:
data[k] = v
# JvdB option to generate unique key, e.g. for database insert
if self._options['gen_key']:
# Generate unique key as md5-string from all values
data['key'] = hashlib.md5(str(data.values())).hexdigest()
# JvdB option to generate unique key, e.g. for database insert
if self._options['gen_key']:
# Generate unique key as md5-string from all values
data['key'] = hashlib.md5(str(data.values())).hexdigest()
return data
except Exception as e:
raise ApacheLogParserError("Unable to parse: %s with the %s regular expression e=%s" % (line, self._pattern, str(e)))
raise ApacheLogParserError("Unable to parse: %s with the %s regular expression" % (line, self._pattern))
return data
def alias(self, name):
"""
......
__version__ = "1.1"
__version__ = "1.2"
!coverage.py: This is a private format, don't read it directly!{"lines":{"/Users/just/project/stetl/git/tests/outputs/test_standard_output.py":[],"/Users/just/project/stetl/git/tests/filters/__init__.py":[],"/Users/just/project/stetl/git/tests/test_args.py":[],"/Users/just/project/stetl/git/tests/inputs/test_json_file_input.py":[],"/Users/just/project/stetl/git/tests/filters/test_command_exec_filter.py":[],"/Users/just/project/stetl/git/tests/inputs/test_xml_element_streamer_file_input.py":[],"/Users/just/project/stetl/git/tests/outputs/__init__.py":[],"/Users/just/project/stetl/git/tests/filters/test_xml_element_reader.py":[],"/Users/just/project/stetl/git/tests/inputs/test_line_streamer_file_input.py":[],"/Users/just/project/stetl/git/tests/outputs/test_ogr2ogr_exec_output.py":[],"/Users/just/project/stetl/git/tests/inputs/test_ogr_input.py":[],"/Users/just/project/stetl/git/tests/outputs/test_postgres_db_output.py":[],"/Users/just/project/stetl/git/tests/filters/test_regex_filter.py":[],"/Users/just/project/stetl/git/tests/outputs/test_split_outputs.py":[],"/Users/just/project/stetl/git/tests/filters/test_zip_file_extractor.py":[],"/Users/just/project/stetl/git/tests/inputs/test_string_file_input.py":[],"/Users/just/project/stetl/git/tests/filters/test_packet_writer.py":[],"/Users/just/project/stetl/git/tests/outputs/test_command_exec_output.py":[],"/Users/just/project/stetl/git/tests/filters/test_xslt_filter.py":[],"/Users/just/project/stetl/git/tests/inputs/test_merger_multi_input.py":[],"/Users/just/project/stetl/git/tests/test_chain.py":[],"/Users/just/project/stetl/git/tests/stetl_test_case.py":[],"/Users/just/project/stetl/git/tests/filters/test_xml_assembler.py":[],"/Users/just/project/stetl/git/tests/filters/test_string_substitution_filter.py":[],"/Users/just/project/stetl/git/tests/__init__.py":[],"/Users/just/project/stetl/git/tests/inputs/test_csv_file_input.py":[],"/Users/just/project/stetl/git/tests/inputs/test_zip_file_input.py":[],"/Users/just/project/stetl/git/tests/inputs/__init__.py":[],"/Users/just/project/stetl/git/tests/inputs/test_glob_file_input.py":[],"/Users/just/project/stetl/git/tests/test_config.py":[]}}
\ No newline at end of file
# Copies an input file to an output file using substitutable args.
[etl]
chains = input_file|output_file
[input_file]
class = inputs.fileinput.XmlFileInput
file_path = {in_file}
[output_file]
class = outputs.fileoutput.FileOutput
file_path = {out_file}
in_file=default_infile.txt
out_file=default_outfile.txt
city,lat,lon
amsterdam,52.4,4.9
otterlo,52.101,5.773
rotterdam,51.9,4.5
eindhoven,51.44,5.47
python -c "print '{0}/{1}'.format('foo','bar')"
\ No newline at end of file
INFO: Open of `bgt_gebouwinstallatie.gml'
using driver `GML' successful.
Layer name: BuildingInstallation
Geometry: Curve Polygon
Feature Count: 1162
Extent: (93971.990000, 433941.050000) - (96020.190000, 436004.350000)
Layer SRS WKT:
PROJCS["Amersfoort / RD New",
GEOGCS["Amersfoort",