Skip to content
Commits on Source (2)
q2cli/_version.py export-subst
qiime2/_version.py export-subst
......@@ -63,3 +63,5 @@ target/
# vi
.*.swp
.DS_Store
......@@ -14,12 +14,10 @@ install:
- wget -q https://raw.githubusercontent.com/qiime2/environment-files/master/latest/staging/qiime2-latest-py36-linux-conda.yml
- conda env create -q -n test-env --file qiime2-latest-py36-linux-conda.yml
- source activate test-env
- conda install -q nose
- pip install -q flake8
- conda install nose
- pip install -q https://github.com/qiime2/q2lint/archive/master.zip
- make install
script:
- make lint
- make test
- QIIMETEST= source tab-qiime
- QIIMETEST= qiime info
include versioneer.py
include q2cli/_version.py
include qiime2/_version.py
......@@ -15,12 +15,12 @@ test: all
install: all
$(PYTHON) setup.py install && \
mkdir -p $(PREFIX)/etc/conda/activate.d && \
cp hooks/50_activate_q2cli_tab_completion.sh $(PREFIX)/etc/conda/activate.d/
cp hooks/00_activate_qiime2_envs.sh $(PREFIX)/etc/conda/activate.d/ && \
mkdir -p $(PREFIX)/etc/conda/deactivate.d && \
cp hooks/00_deactivate_qiime2_envs.sh $(PREFIX)/etc/conda/deactivate.d/
dev: all
pip install -e . && \
mkdir -p $(PREFIX)/etc/conda/activate.d && \
cp hooks/50_activate_q2cli_tab_completion.sh $(PREFIX)/etc/conda/activate.d/
pip install -e .
clean: distclean
......
# q2cli
A [click-based](http://click.pocoo.org/) command line interface for [QIIME 2](https://github.com/qiime2/qiime2).
# QIIME 2
## Installation and getting help
[![Build Status](https://travis-ci.org/qiime2/qiime2.svg?branch=master)](https://travis-ci.org/qiime2/qiime2)
Visit https://qiime2.org to learn more about q2cli and the QIIME 2 project.
Source code repository for the QIIME 2 framework.
## Enabling tab completion
QIIME 2™ is a powerful, extensible, and decentralized microbiome bioinformatics platform that is free, open source, and community developed. With a focus on data and analysis transparency, QIIME 2 enables researchers to start an analysis with raw DNA sequence data and finish with publication-quality figures and statistical results.
### Bash
Visit [https://qiime2.org](https://qiime2.org) to learn more about the QIIME 2 project.
To enable tab completion in Bash, run the following command or add it to your `.bashrc`/`.bash_profile`:
## Installation
```bash
source tab-qiime
```
Detailed instructions are available in the [documentation](https://docs.qiime2.org/).
### ZSH
## Users
Head to the [user docs](https://docs.qiime2.org/) for help getting started, core concepts, tutorials, and other resources.
To enable tab completion in ZSH, run the following commands or add them to your `.zshrc`:
Just have a question? Please ask it in our [forum](https://forum.qiime2.org/c/user-support).
```bash
autoload bashcompinit && bashcompinit && source tab-qiime
```
## Developers
Please visit the [contributing page](https://github.com/qiime2/qiime2/blob/master/.github/CONTRIBUTING.md) for more information on contributions, documentation links, and more.
## Citing QIIME 2
If you use QIIME 2 for any published research, please include the following citation:
> Bolyen E, Rideout JR, Dillon MR, Bokulich NA, Abnet CC, Al-Ghalith GA, Alexander H, Alm EJ, Arumugam M, Asnicar F, Bai Y, Bisanz JE, Bittinger K, Brejnrod A, Brislawn CJ, Brown CT, Callahan BJ, Caraballo-Rodríguez AM, Chase J, Cope EK, Da Silva R, Diener C, Dorrestein PC, Douglas GM, Durall DM, Duvallet C, Edwardson CF, Ernst M, Estaki M, Fouquier J, Gauglitz JM, Gibbons SM, Gibson DL, Gonzalez A, Gorlick K, Guo J, Hillmann B, Holmes S, Holste H, Huttenhower C, Huttley GA, Janssen S, Jarmusch AK, Jiang L, Kaehler BD, Kang KB, Keefe CR, Keim P, Kelley ST, Knights D, Koester I, Kosciolek T, Kreps J, Langille MGI, Lee J, Ley R, Liu YX, Loftfield E, Lozupone C, Maher M, Marotz C, Martin BD, McDonald D, McIver LJ, Melnik AV, Metcalf JL, Morgan SC, Morton JT, Naimey AT, Navas-Molina JA, Nothias LF, Orchanian SB, Pearson T, Peoples SL, Petras D, Preuss ML, Pruesse E, Rasmussen LB, Rivers A, Robeson MS, Rosenthal P, Segata N, Shaffer M, Shiffer A, Sinha R, Song SJ, Spear JR, Swafford AD, Thompson LR, Torres PJ, Trinh P, Tripathi A, Turnbaugh PJ, Ul-Hasan S, van der Hooft JJJ, Vargas F, Vázquez-Baeza Y, Vogtmann E, von Hippel M, Walters W, Wan Y, Wang M, Warren J, Weber KC, Williamson CHD, Willis AD, Xu ZZ, Zaneveld JR, Zhang Y, Zhu Q, Knight R, and Caporaso JG. 2019. Reproducible, interactive, scalable and extensible microbiome data science using QIIME 2. Nature Biotechnology. https://doi.org/10.1038/s41587-019-0209-9
#!/usr/bin/env bash
# Bash completion script that defers to a cached completion script representing
# the state of the current QIIME 2 deployment.
#
# This script is intended to be executed on the command-line or in
# .bashrc/.bash_profile:
#
# source tab-qiime
#
_qiime_completion()
{
# Attempt to find the cached completion script. If q2cli isn't installed, or
# is an incompatible version, don't attempt completion.
local completion_path="$(python -c "import q2cli.util; print(q2cli.util.get_completion_path())" 2> /dev/null)"
if [[ $? != 0 ]]; then
unset COMPREPLY
return 0
fi
# If the completion script exists, attempt completion by invoking the script
# in a subshell, supplying COMP_WORDS and COMP_CWORD. Capture the output as
# the completion reply. If the completion script failed, don't attempt
# completion.
if [[ -f "$completion_path" ]] ; then
COMPREPLY=( $(COMP_WORDS="${COMP_WORDS[*]}" COMP_CWORD="${COMP_CWORD}" "$completion_path" 2> /dev/null) )
if [[ $? != 0 ]]; then
unset COMPREPLY
return 0
fi
else
unset COMPREPLY
return 0
fi
return 0
}
# Enable default readline and bash completion behavior when `_qiime_completion`
# doesn't have a reply.
complete -F _qiime_completion -o default -o bashdefault qiime
# Execute a `qiime` command (any command will do) so that tab-completion will
# work out-of-the-box (e.g. with a fresh installation of q2cli). Running a
# command will create or refresh the cache if necessary, which contains the
# actual completion script.
#
# Ignore stdout to avoid displaying help text to users enabling tab-completion.
# stderr displays the note about cache refreshing, as that can take a few
# moments to complete.
qiime > /dev/null
{% set data = load_setup_py_data() %}
{% set version = data.get('version') or 'placehold' %}
{% set release = '.'.join(version.split('.')[:2]) %}
{% set version = data.get('version') %}
package:
name: q2cli
name: qiime2
version: {{ version }}
source:
......@@ -11,26 +10,32 @@ source:
build:
script: make install
entry_points:
- qiime=q2cli.__main__:qiime
requirements:
host:
- python {{ python }}
- python {{ python }}
- setuptools
run:
- python {{ python }}
- pip
- click
- qiime2 {{ release }}.*
- python {{ python }}
- pyyaml
- decorator
# TODO: unset this pin once pandas 0.25.x is sorted out
- pandas 0.24.2
- tzlocal
- python-dateutil
- pyparsing ==2.3.0
- bibtexparser
- networkx
test:
imports:
- q2cli
- qiime2
commands:
- QIIMETEST= qiime --help
# TODO don't require devs to remember setting this env var before running
# tests. The value can be anything.
- QIIMETEST= python -c "import qiime2.plugins.dummy_plugin"
about:
home: https://qiime2.org
......
#!/bin/sh
export MPLBACKEND='Agg'
export R_LIBS_USER=$CONDA_PREFIX/lib/R/library/
export PYTHONNOUSERSITE=$CONDA_PREFIX/lib/python*/site-packages/
#!/bin/sh
unset MPLBACKEND
unset R_LIBS_USER
unset PYTHONNOUSERSITE
if [ -n "${ZSH_VERSION-}" ]; then
autoload bashcompinit && bashcompinit && source tab-qiime
elif [ -n "${BASH_VERSION-}" ]; then
source tab-qiime
fi
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2019, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
import click
from q2cli.click.command import ToolCommand, ToolGroupCommand
@click.group(help='Utilities for developers and advanced users.',
cls=ToolGroupCommand)
def dev():
pass
@dev.command(name='refresh-cache',
short_help='Refresh CLI cache.',
help="Refresh the CLI cache. Use this command if you are "
"developing a plugin, or q2cli itself, and want your "
"changes to take effect in the CLI. A refresh of the cache "
"is necessary because package versions do not typically "
"change each time an update is made to a package's code. "
"Setting the environment variable Q2CLIDEV to any value "
"will always refresh the cache when a command is run.",
cls=ToolCommand)
def refresh_cache():
import q2cli.core.cache
q2cli.core.cache.CACHE.refresh()
import_theme_help = \
("Allows for customization of q2cli's command line styling based on an "
"imported .theme (INI formatted) file. If you are unfamiliar with .ini "
"formatted files look here https://en.wikipedia.org/wiki/INI_file."
"\n"
"\n"
"The .theme file allows you to customize text on the basis of what that "
"text represents with the following supported text types: command, "
"option, type, default_arg, required, emphasis, problem, warning, error, "
"and success. These will be your headers in the '[]' brackets. "
"\n"
"\n"
"`command` refers to the name of the command you issued. `option` refers "
"to the arguments you give to the command when running it. `type` refers "
"to the QIIME 2 semantic typing of these arguments (where applicable). "
"`default_arg` refers to the label next to the argument indicating its "
"default value (where applicable), and if it is required (where "
"applicable). `required` refers to any arguments that must be passed to "
"the command for it to work and gives them special formatting on top of "
"your normal `option` formatting. `emphasis` refers to any emphasized "
"pieces of text within help text. `problem` refers to the text informing "
"you there were issues with the command. `warning` refers to the text "
"for non-fatal issues while `error` refers to the text for fatal issues."
"`success` refers to text indicating a process completed as expected."
"\n"
"\n"
"Depending on what your terminal supports, some or all of the following "
"pieces of the text's formatting may be customized: bold, dim (if true "
"the text's brightness is reduced), underline, blink, reverse (if true "
"foreground and background colors are reversed), and finally fg "
"(foreground color) and bg (background color). The first five may each "
"be either true or false, while the colors may be set to any of the "
"following: black, red, green, yellow, blue, magenta, cyan, white, "
"bright_black, bright_red, bright_green, bright_yellow, bright_blue, "
"bright_magenta, bright_cyan, or bright_white.")
@dev.command(name='import-theme',
short_help='Install new command line theme.',
help=import_theme_help,
cls=ToolCommand)
@click.option('--theme', required=True,
type=click.Path(exists=True, file_okay=True,
dir_okay=False, readable=True),
help='Path to file containing new theme info')
def import_theme(theme):
import os
import shutil
from configparser import Error
import q2cli.util
from q2cli.core.config import CONFIG
try:
CONFIG.parse_file(theme)
except Error as e:
# If they tried to change [error] in a valid manner before we hit our
# parsing error, we don't want to use their imported error settings
CONFIG.styles = CONFIG.get_default_styles()
header = 'Something went wrong while parsing your theme: '
q2cli.util.exit_with_error(e, header=header, traceback=None)
shutil.copy(theme, os.path.join(q2cli.util.get_app_dir(),
'cli-colors.theme'))
@dev.command(name='export-default-theme',
short_help='Export the default settings.',
help='Create a .theme (INI formatted) file from the default '
'settings at the specified filepath.',
cls=ToolCommand)
@click.option('--output-path', required=True,
type=click.Path(exists=False, file_okay=True,
dir_okay=False, readable=True),
help='Path to output the config to')
def export_default_theme(output_path):
import configparser
from q2cli.core.config import CONFIG
parser = configparser.ConfigParser()
parser.read_dict(CONFIG.get_default_styles())
with open(output_path, 'w') as fh:
parser.write(fh)
def abort_if_false(ctx, param, value):
if not value:
ctx.abort()
@dev.command(name='reset-theme',
short_help='Reset command line theme to default.',
help="Reset command line theme to default. Requres the '--yes' "
"parameter to be passed asserting you do want to reset.",
cls=ToolCommand)
@click.option('--yes', is_flag=True, callback=abort_if_false,
expose_value=False,
prompt='Are you sure you want to reset your theme?')
def reset_theme():
import os
import q2cli.util
path = os.path.join(q2cli.util.get_app_dir(), 'cli-colors.theme')
if os.path.exists(path):
os.unlink(path)
click.echo('Theme reset.')
else:
click.echo('Theme was already default.')
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2019, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
import click
from q2cli.click.command import ToolCommand
def _echo_version():
import sys
import qiime2
import q2cli
pyver = sys.version_info
click.echo('Python version: %d.%d.%d' %
(pyver.major, pyver.minor, pyver.micro))
click.echo('QIIME 2 release: %s' % qiime2.__release__)
click.echo('QIIME 2 version: %s' % qiime2.__version__)
click.echo('q2cli version: %s' % q2cli.__version__)
def _echo_plugins():
import q2cli.core.cache
plugins = q2cli.core.cache.CACHE.plugins
if plugins:
for name, plugin in sorted(plugins.items()):
click.echo('%s: %s' % (name, plugin['version']))
else:
click.secho('No plugins are currently installed.\nYou can browse '
'the official QIIME 2 plugins at https://qiime2.org')
@click.command(help='Display information about current deployment.',
cls=ToolCommand)
def info():
import q2cli.util
# This import improves performance for repeated _echo_plugins
import q2cli.core.cache
click.secho('System versions', fg='green')
_echo_version()
click.secho('\nInstalled plugins', fg='green')
_echo_plugins()
click.secho('\nApplication config directory', fg='green')
click.secho(q2cli.util.get_app_dir())
click.secho('\nGetting help', fg='green')
click.secho('To get help with QIIME 2, visit https://qiime2.org')
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2019, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
import os
import click
import q2cli.util
from q2cli.click.command import ToolCommand, ToolGroupCommand
_COMBO_METAVAR = 'ARTIFACT/VISUALIZATION'
@click.group(help='Tools for working with QIIME 2 files.',
cls=ToolGroupCommand)
def tools():
pass
@tools.command(name='export',
short_help='Export data from a QIIME 2 Artifact '
'or a Visualization',
help='Exporting extracts (and optionally transforms) data '
'stored inside an Artifact or Visualization. Note that '
'Visualizations cannot be transformed with --output-format',
cls=ToolCommand)
@click.option('--input-path', required=True, metavar=_COMBO_METAVAR,
type=click.Path(exists=True, file_okay=True,
dir_okay=False, readable=True),
help='Path to file that should be exported')
@click.option('--output-path', required=True,
type=click.Path(exists=False, file_okay=True, dir_okay=True,
writable=True),
help='Path to file or directory where '
'data should be exported to')
@click.option('--output-format', required=False,
help='Format which the data should be exported as. '
'This option cannot be used with Visualizations')
def export_data(input_path, output_path, output_format):
import qiime2.util
import qiime2.sdk
import distutils
from q2cli.core.config import CONFIG
result = qiime2.sdk.Result.load(input_path)
if output_format is None:
if isinstance(result, qiime2.sdk.Artifact):
output_format = result.format.__name__
else:
output_format = 'Visualization'
result.export_data(output_path)
else:
if isinstance(result, qiime2.sdk.Visualization):
error = '--output-format cannot be used with visualizations'
click.echo(CONFIG.cfg_style('error', error), err=True)
click.get_current_context().exit(1)
else:
source = result.view(qiime2.sdk.parse_format(output_format))
if os.path.isfile(str(source)):
if os.path.isfile(output_path):
os.remove(output_path)
else:
# create directory (recursively) if it doesn't exist yet
os.makedirs(os.path.dirname(output_path), exist_ok=True)
qiime2.util.duplicate(str(source), output_path)
else:
distutils.dir_util.copy_tree(str(source), output_path)
output_type = 'file' if os.path.isfile(output_path) else 'directory'
success = 'Exported %s as %s to %s %s' % (input_path, output_format,
output_type, output_path)
click.echo(CONFIG.cfg_style('success', success))
def show_importable_types(ctx, param, value):
if not value or ctx.resilient_parsing:
return
import qiime2.sdk
importable_types = sorted(qiime2.sdk.PluginManager().importable_types,
key=repr)
if importable_types:
for name in importable_types:
click.echo(name)
else:
click.echo('There are no importable types in the current deployment.')
ctx.exit()
def show_importable_formats(ctx, param, value):
if not value or ctx.resilient_parsing:
return
import qiime2.sdk
importable_formats = sorted(qiime2.sdk.PluginManager().importable_formats)
if importable_formats:
for format in importable_formats:
click.echo(format)
else:
click.echo('There are no importable formats '
'in the current deployment.')
ctx.exit()
@tools.command(name='import',
short_help='Import data into a new QIIME 2 Artifact.',
help="Import data to create a new QIIME 2 Artifact. See "
"https://docs.qiime2.org/ for usage examples and details "
"on the file types and associated semantic types that can "
"be imported.",
cls=ToolCommand)
@click.option('--type', required=True,
help='The semantic type of the artifact that will be created '
'upon importing. Use --show-importable-types to see what '
'importable semantic types are available in the current '
'deployment.')
@click.option('--input-path', required=True,
type=click.Path(exists=True, file_okay=True, dir_okay=True,
readable=True),
help='Path to file or directory that should be imported.')
@click.option('--output-path', required=True, metavar='ARTIFACT',
type=click.Path(exists=False, file_okay=True, dir_okay=False,
writable=True),
help='Path where output artifact should be written.')
@click.option('--input-format', required=False,
help='The format of the data to be imported. If not provided, '
'data must be in the format expected by the semantic type '
'provided via --type.')
@click.option('--show-importable-types', is_flag=True, is_eager=True,
callback=show_importable_types, expose_value=False,
help='Show the semantic types that can be supplied to --type '
'to import data into an artifact.')
@click.option('--show-importable-formats', is_flag=True, is_eager=True,
callback=show_importable_formats, expose_value=False,
help='Show formats that can be supplied to --input-format to '
'import data into an artifact.')
def import_data(type, input_path, output_path, input_format):
import qiime2.sdk
import qiime2.plugin
from q2cli.core.config import CONFIG
try:
artifact = qiime2.sdk.Artifact.import_data(type, input_path,
view_type=input_format)
except qiime2.plugin.ValidationError as e:
header = 'There was a problem importing %s:' % input_path
q2cli.util.exit_with_error(e, header=header, traceback=None)
except Exception as e:
header = 'An unexpected error has occurred:'
q2cli.util.exit_with_error(e, header=header)
artifact.save(output_path)
if input_format is None:
input_format = artifact.format.__name__
success = 'Imported %s as %s to %s' % (input_path,
input_format,
output_path)
click.echo(CONFIG.cfg_style('success', success))
@tools.command(short_help='Take a peek at a QIIME 2 Artifact or '
'Visualization.',
help="Display basic information about a QIIME 2 Artifact or "
"Visualization, including its UUID and type.",
cls=ToolCommand)
@click.argument('path', type=click.Path(exists=True, file_okay=True,
dir_okay=False, readable=True),
metavar=_COMBO_METAVAR)
def peek(path):
import qiime2.sdk
from q2cli.core.config import CONFIG
metadata = qiime2.sdk.Result.peek(path)
click.echo(CONFIG.cfg_style('type', "UUID")+": ", nl=False)
click.echo(metadata.uuid)
click.echo(CONFIG.cfg_style('type', "Type")+": ", nl=False)
click.echo(metadata.type)
if metadata.format is not None:
click.echo(CONFIG.cfg_style('type', "Data format")+": ", nl=False)
click.echo(metadata.format)
@tools.command('inspect-metadata',
short_help='Inspect columns available in metadata.',
help='Inspect metadata files or artifacts viewable as metadata.'
' Providing multiple file paths to this command will merge'
' the metadata.',
cls=ToolCommand)
@click.option('--tsv/--no-tsv', default=False,
help='Print as machine-readable TSV instead of text.')
@click.argument('paths', nargs=-1, required=True, metavar='METADATA...',
type=click.Path(exists=True, file_okay=True, dir_okay=False,
readable=True))
@q2cli.util.pretty_failure(traceback=None)
def inspect_metadata(paths, tsv, failure):
m = [_load_metadata(p) for p in paths]
metadata = m[0]
if m[1:]:
metadata = metadata.merge(*m[1:])
# we aren't expecting errors below this point, so set traceback to default
failure.traceback = 'stderr'
failure.header = "An unexpected error has occurred:"
COLUMN_NAME = "COLUMN NAME"
COLUMN_TYPE = "TYPE"
max_name_len = max([len(n) for n in metadata.columns]
+ [len(COLUMN_NAME)])
max_type_len = max([len(p.type) for p in metadata.columns.values()]
+ [len(COLUMN_TYPE)])
if tsv:
import csv
import io
def formatter(*row):
# This is gross, but less gross than robust TSV writing.
with io.StringIO() as fh:
writer = csv.writer(fh, dialect='excel-tab', lineterminator='')
writer.writerow(row)
return fh.getvalue()
else:
formatter = ("{0:>%d} {1:%d}" % (max_name_len, max_type_len)).format
click.secho(formatter(COLUMN_NAME, COLUMN_TYPE), bold=True)
if not tsv:
click.secho(formatter("=" * max_name_len, "=" * max_type_len),
bold=True)
for name, props in metadata.columns.items():
click.echo(formatter(name, props.type))
if not tsv:
click.secho(formatter("=" * max_name_len, "=" * max_type_len),
bold=True)
click.secho(("{0:>%d} " % max_name_len).format("IDS:"),
bold=True, nl=False)
click.echo(metadata.id_count)
click.secho(("{0:>%d} " % max_name_len).format("COLUMNS:"),
bold=True, nl=False)
click.echo(metadata.column_count)
def _load_metadata(path):
import qiime2
import qiime2.sdk
# TODO: clean up duplication between this and the metadata handlers.
try:
artifact = qiime2.sdk.Result.load(path)
except Exception:
metadata = qiime2.Metadata.load(path)
else:
if isinstance(artifact, qiime2.Visualization):
raise Exception("Visualizations cannot be viewed as QIIME 2"
" metadata:\n%r" % path)
elif artifact.has_metadata():
metadata = artifact.view(qiime2.Metadata)
else:
raise Exception("Artifacts with type %r cannot be viewed as"
" QIIME 2 metadata:\n%r" % (artifact.type, path))
return metadata
@tools.command(short_help='View a QIIME 2 Visualization.',
help="Displays a QIIME 2 Visualization until the command "
"exits. To open a QIIME 2 Visualization so it can be "
"used after the command exits, use 'qiime tools extract'.",
cls=ToolCommand)
@click.argument('visualization-path', metavar='VISUALIZATION',
type=click.Path(exists=True, file_okay=True, dir_okay=False,
readable=True))
@click.option('--index-extension', required=False, default='html',
help='The extension of the index file that should be opened. '
'[default: html]')
def view(visualization_path, index_extension):
# Guard headless envs from having to import anything large
import sys
from q2cli.core.config import CONFIG
if not os.getenv("DISPLAY") and sys.platform != "darwin":
raise click.UsageError(
'Visualization viewing is currently not supported in headless '
'environments. You can view Visualizations (and Artifacts) at '
'https://view.qiime2.org, or move the Visualization to an '
'environment with a display and view it with `qiime tools view`.')
import zipfile
import qiime2.sdk
if index_extension.startswith('.'):
index_extension = index_extension[1:]
try:
visualization = qiime2.sdk.Visualization.load(visualization_path)
# TODO: currently a KeyError is raised if a zipped file that is not a
# QIIME 2 result is passed. This should be handled better by the framework.
except (zipfile.BadZipFile, KeyError, TypeError):
raise click.BadParameter(
'%s is not a QIIME 2 Visualization. Only QIIME 2 Visualizations '
'can be viewed.' % visualization_path)
index_paths = visualization.get_index_paths(relative=False)
if index_extension not in index_paths:
raise click.BadParameter(
'No index %s file is present in the archive. Available index '
'extensions are: %s' % (index_extension,
', '.join(index_paths.keys())))
else:
index_path = index_paths[index_extension]
launch_status = click.launch(index_path)
if launch_status != 0:
click.echo(CONFIG.cfg_style('error', 'Viewing visualization '
'failed while attempting to open '
f'{index_path}'), err=True)
else:
while True:
click.echo(
"Press the 'q' key, Control-C, or Control-D to quit. This "
"view may no longer be accessible or work correctly after "
"quitting.", nl=False)
# There is currently a bug in click.getchar where translation
# of Control-C and Control-D into KeyboardInterrupt and
# EOFError (respectively) does not work on Python 3. The code
# here should continue to work as expected when the bug is
# fixed in Click.
#
# https://github.com/pallets/click/issues/583
try:
char = click.getchar()
click.echo()
if char in {'q', '\x03', '\x04'}:
break
except (KeyboardInterrupt, EOFError):
break
@tools.command(short_help="Extract a QIIME 2 Artifact or Visualization "
"archive.",
help="Extract all contents of a QIIME 2 Artifact or "
"Visualization's archive, including provenance, metadata, "
"and actual data. Use 'qiime tools export' to export only "
"the data stored in an Artifact or Visualization, with "
"the choice of exporting to different formats.",
cls=ToolCommand)
@click.option('--input-path', required=True, metavar=_COMBO_METAVAR,
type=click.Path(exists=True, file_okay=True, dir_okay=False,
readable=True),
help='Path to file that should be extracted')
@click.option('--output-path', required=False,
type=click.Path(exists=False, file_okay=False, dir_okay=True,
writable=True),
help='Directory where archive should be extracted to '
'[default: current working directory]',
default=os.getcwd())
def extract(input_path, output_path):
import zipfile
import qiime2.sdk
from q2cli.core.config import CONFIG
try:
extracted_dir = qiime2.sdk.Result.extract(input_path, output_path)
except (zipfile.BadZipFile, ValueError):
raise click.BadParameter(
'%s is not a valid QIIME 2 Result. Only QIIME 2 Artifacts and '
'Visualizations can be extracted.' % input_path)
else:
success = 'Extracted %s to directory %s' % (input_path, extracted_dir)
click.echo(CONFIG.cfg_style('success', success))
@tools.command(short_help='Validate data in a QIIME 2 Artifact.',
help='Validate data in a QIIME 2 Artifact. QIIME 2 '
'automatically performs some basic validation when '
'managing your data; use this command to perform explicit '
'and/or more thorough validation of your data (e.g. when '
'debugging issues with your data or analyses).\n\nNote: '
'validation can take some time to complete, depending on '
'the size and type of your data.',
cls=ToolCommand)
@click.argument('path', type=click.Path(exists=True, file_okay=True,
dir_okay=False, readable=True),
metavar=_COMBO_METAVAR)
@click.option('--level', required=False, type=click.Choice(['min', 'max']),
help='Desired level of validation. "min" will perform minimal '
'validation, and "max" will perform maximal validation (at '
'the potential cost of runtime).',
default='max', show_default=True)
def validate(path, level):
import qiime2.sdk
from q2cli.core.config import CONFIG
try:
result = qiime2.sdk.Result.load(path)
except Exception as e:
header = 'There was a problem loading %s as a QIIME 2 Result:' % path
q2cli.util.exit_with_error(e, header=header)
try:
result.validate(level)
except qiime2.plugin.ValidationError as e:
header = 'Result %s does not appear to be valid at level=%s:' % (
path, level)
q2cli.util.exit_with_error(e, header=header, traceback=None)
except Exception as e:
header = ('An unexpected error has occurred while attempting to '
'validate result %s:' % path)
q2cli.util.exit_with_error(e, header=header)
else:
click.echo(CONFIG.cfg_style('success', f'Result {path} appears to be '
f'valid at level={level}.'))
@tools.command(short_help='Print citations for a QIIME 2 result.',
help='Print citations as a BibTex file (.bib) for a QIIME 2'
' result.',
cls=ToolCommand)
@click.argument('path', type=click.Path(exists=True, file_okay=True,
dir_okay=False, readable=True),
metavar=_COMBO_METAVAR)
def citations(path):
import qiime2.sdk
import io
from q2cli.core.config import CONFIG
ctx = click.get_current_context()
try:
result = qiime2.sdk.Result.load(path)
except Exception as e:
header = 'There was a problem loading %s as a QIIME 2 result:' % path
q2cli.util.exit_with_error(e, header=header)
if result.citations:
with io.StringIO() as fh:
result.citations.save(fh)
click.echo(fh.getvalue(), nl=False)
ctx.exit(0)
else:
click.echo(CONFIG.cfg_style('problem', 'No citations found.'),
err=True)
ctx.exit(1)
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2019, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Some of the source code in this file is derived from original work:
#
# Copyright (c) 2014 by the Pallets team.
#
# To see the license for the original work, see licenses/click.LICENSE.rst
# Specific reproduction and derivation of original work is marked below.
# ----------------------------------------------------------------------------
import click
import click.core
class BaseCommandMixin:
# Modified from original:
# < https://github.com/pallets/click/blob/
# c6042bf2607c5be22b1efef2e42a94ffd281434c/click/core.py#L867 >
# Copyright (c) 2014 by the Pallets team.
def make_parser(self, ctx):
"""Creates the underlying option parser for this command."""
from .parser import Q2Parser
parser = Q2Parser(ctx)
for param in self.get_params(ctx):
param.add_to_parser(parser, ctx)
return parser
# Modified from original:
# < https://github.com/pallets/click/blob/
# c6042bf2607c5be22b1efef2e42a94ffd281434c/click/core.py#L934 >
# Copyright (c) 2014 by the Pallets team.
def parse_args(self, ctx, args):
from q2cli.core.config import CONFIG
if isinstance(self, click.MultiCommand):
return super().parse_args(ctx, args)
errors = []
parser = self.make_parser(ctx)
skip_rest = False
for _ in range(10): # surely this is enough attempts
try:
opts, args, param_order = parser.parse_args(args=args)
break
except click.ClickException as e:
errors.append(e)
skip_rest = True
if not skip_rest:
for param in click.core.iter_params_for_processing(
param_order, self.get_params(ctx)):
try:
value, args = param.handle_parse_result(ctx, opts, args)
except click.ClickException as e:
errors.append(e)
if args and not ctx.allow_extra_args and not ctx.resilient_parsing:
errors.append(click.UsageError(
'Got unexpected extra argument%s (%s)'
% (len(args) != 1 and 's' or '',
' '.join(map(click.core.make_str, args)))))
if errors:
click.echo(ctx.get_help()+"\n", err=True)
if len(errors) > 1:
problems = 'There were some problems with the command:'
else:
problems = 'There was a problem with the command:'
click.echo(CONFIG.cfg_style('problem',
problems.center(78, ' ')), err=True)
for idx, e in enumerate(errors, 1):
msg = click.formatting.wrap_text(
e.format_message(),
initial_indent=' (%d/%d%s) ' % (idx, len(errors),
'?' if skip_rest else ''),
subsequent_indent=' ')
click.secho(CONFIG.cfg_style('error', msg), err=True)
ctx.exit(1)
ctx.args = args
return args
def get_option_names(self, ctx):
if not hasattr(self, '__option_names'):
names = set()
for param in self.get_params(ctx):
if hasattr(param, 'q2_name'):
names.add(param.q2_name)
else:
names.add(param.name)
self.__option_names = names
return self.__option_names
def list_commands(self, ctx):
if not hasattr(super(), 'list_commands'):
return []
return super().list_commands(ctx)
def get_opt_groups(self, ctx):
return {'Options': list(self.get_params(ctx))}
def format_help_text(self, ctx, formatter):
super().format_help_text(ctx, formatter)
formatter.write_paragraph()
# Modified from original:
# < https://github.com/pallets/click/blob
# /c6042bf2607c5be22b1efef2e42a94ffd281434c/click/core.py#L830 >
# Copyright (c) 2014 by the Pallets team.
def format_usage(self, ctx, formatter):
from q2cli.core.config import CONFIG
"""Writes the usage line into the formatter."""
pieces = self.collect_usage_pieces(ctx)
formatter.write_usage(CONFIG.cfg_style('command', ctx.command_path),
' '.join(pieces))
def format_options(self, ctx, formatter, COL_MAX=23, COL_MIN=10):
from q2cli.core.config import CONFIG
# write options
opt_groups = {}
records = []
for group, options in self.get_opt_groups(ctx).items():
opt_records = []
for o in options:
record = o.get_help_record(ctx)
if record is None:
continue
opt_records.append((o, record))
records.append(record)
opt_groups[group] = opt_records
first_columns = (r[0] for r in records)
border = min(COL_MAX, max(COL_MIN, *(len(col) for col in first_columns
if len(col) < COL_MAX)))
for opt_group, opt_records in opt_groups.items():
if not opt_records:
continue
formatter.write_heading(click.style(opt_group, bold=True))
formatter.indent()
padded_border = border + formatter.current_indent
for opt, record in opt_records:
self.write_option(ctx, formatter, opt, record, padded_border)
formatter.dedent()
# Modified from original:
# https://github.com/pallets/click/blob
# /c6042bf2607c5be22b1efef2e42a94ffd281434c/click/core.py#L1056
# Copyright (c) 2014 by the Pallets team.
commands = []
for subcommand in self.list_commands(ctx):
cmd = self.get_command(ctx, subcommand)
# What is this, the tool lied about a command. Ignore it
if cmd is None:
continue
if cmd.hidden:
continue
commands.append((subcommand, cmd))
# allow for 3 times the default spacing
if len(commands):
limit = formatter.width - 6 - max(len(cmd[0]) for cmd in commands)
rows = []
for subcommand, cmd in commands:
help = cmd.get_short_help_str(limit)
rows.append((CONFIG.cfg_style('command', subcommand), help))
if rows:
with formatter.section(click.style('Commands', bold=True)):
formatter.write_dl(rows)
def write_option(self, ctx, formatter, opt, record, border, COL_SPACING=2):
import itertools
from q2cli.core.config import CONFIG
full_width = formatter.width - formatter.current_indent
indent_text = ' ' * formatter.current_indent
opt_text, help_text = record
opt_text_secondary = None
if type(opt_text) is tuple:
opt_text, opt_text_secondary = opt_text
help_text, requirements = self._clean_help(help_text)
type_placement = None
type_repr = None
type_indent = 2 * indent_text
if hasattr(opt.type, 'get_type_repr'):
type_repr = opt.type.get_type_repr(opt)
if type_repr is not None:
if len(type_repr) <= border - len(type_indent):
type_placement = 'under'
else:
type_placement = 'beside'
if len(opt_text) > border:
lines = simple_wrap(opt_text, full_width)
else:
lines = [opt_text.split(' ')]
if opt_text_secondary is not None:
lines.append(opt_text_secondary.split(' '))
to_write = []
for tokens in lines:
dangling_edge = formatter.current_indent
styled = []
for token in tokens:
dangling_edge += len(token) + 1
if token.startswith('--'):
token = CONFIG.cfg_style('option', token,
required=opt.required)
styled.append(token)
line = indent_text + ' '.join(styled)
to_write.append(line)
formatter.write('\n'.join(to_write))
dangling_edge -= 1
if type_placement == 'beside':
lines = simple_wrap(type_repr, formatter.width - len(type_indent),
start_col=dangling_edge - 1)
to_write = []
first_iter = True
for tokens in lines:
line = ' '.join(tokens)
if first_iter:
dangling_edge += 1 + len(line)
line = " " + CONFIG.cfg_style('type', line)
first_iter = False
else:
dangling_edge = len(type_indent) + len(line)
line = type_indent + CONFIG.cfg_style('type', line)
to_write.append(line)
formatter.write('\n'.join(to_write))
if dangling_edge + 1 > border + COL_SPACING:
formatter.write('\n')
left_col = []
else:
padding = ' ' * (border + COL_SPACING - dangling_edge)
formatter.write(padding)
dangling_edge += len(padding)
left_col = [''] # jagged start
if type_placement == 'under':
padding = ' ' * (border + COL_SPACING
- len(type_repr) - len(type_indent))
line = ''.join(
[type_indent, CONFIG.cfg_style('type', type_repr), padding])
left_col.append(line)
if hasattr(opt, 'meta_help') and opt.meta_help is not None:
meta_help = simple_wrap(opt.meta_help,
border - len(type_indent) - 1)
for idx, line in enumerate([' '.join(t) for t in meta_help]):
if idx == 0:
line = type_indent + '(' + line
else:
line = type_indent + ' ' + line
if idx == len(meta_help) - 1:
line += ')'
line += ' ' * (border - len(line) + COL_SPACING)
left_col.append(line)
right_col = simple_wrap(help_text,
formatter.width - border - COL_SPACING)
right_col = [' '.join(self._color_important(tokens, ctx))
for tokens in right_col]
to_write = []
for left, right in itertools.zip_longest(
left_col, right_col, fillvalue=' ' * (border + COL_SPACING)):
to_write.append(left)
if right.strip():
to_write[-1] += right
formatter.write('\n'.join(to_write))
if requirements is None:
formatter.write('\n')
else:
if to_write:
if len(to_write) > 1 or ((not left_col) or left_col[0] != ''):
dangling_edge = 0
dangling_edge += click.formatting.term_len(to_write[-1])
else:
pass # dangling_edge is still correct
if dangling_edge + 1 + len(requirements) > formatter.width:
formatter.write('\n')
pad = formatter.width - len(requirements)
else:
pad = formatter.width - len(requirements) - dangling_edge
formatter.write(
(' ' * pad) + CONFIG.cfg_style(
'default_arg', requirements) + '\n')
def _color_important(self, tokens, ctx):
import re
from q2cli.core.config import CONFIG
for t in tokens:
if '_' in t:
names = self.get_option_names(ctx)
if re.sub(r'[^\w]', '', t) in names:
m = re.search(r'(\w+)', t)
word = t[m.start():m.end()]
word = CONFIG.cfg_style('emphasis', word.replace('_', '-'))
token = t[:m.start()] + word + t[m.end():]
yield token
continue
yield t
def _clean_help(self, text):
reqs = ['[required]', '[optional]', '[default: ']
requirement = None
for req in reqs:
if req in text:
requirement = req
break
else:
return text, None
req_idx = text.index(requirement)
return text[:req_idx].strip(), text[req_idx:].strip()
class ToolCommand(BaseCommandMixin, click.Command):
pass
class ToolGroupCommand(BaseCommandMixin, click.Group):
pass
def simple_wrap(text, target, start_col=0):
result = [[]]
current_line = result[0]
current_width = start_col
tokens = []
for token in text.split(' '):
if len(token) <= target:
tokens.append(token)
else:
for i in range(0, len(token), target):
tokens.append(token[i:i+target])
for token in tokens:
token_len = len(token)
if current_width + 1 + token_len > target:
current_line = [token]
result.append(current_line)
current_width = token_len
else:
result[-1].append(token)
current_width += 1 + token_len
return result
Copyright © 2014 by the Pallets team.
Some rights reserved.
Redistribution and use in source and binary forms of the software as
well as documentation, with or without modification, are permitted
provided that the following conditions are met:
- Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
- Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE AND DOCUMENTATION IS PROVIDED BY THE COPYRIGHT HOLDERS AND
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
THIS SOFTWARE AND DOCUMENTATION, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
----
Click uses parts of optparse written by Gregory P. Ward and maintained
by the Python Software Foundation. This is limited to code in parser.py.
Copyright © 2001-2006 Gregory P. Ward. All rights reserved.
Copyright © 2002-2006 Python Software Foundation. All rights reserved.
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2019, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
import click
from .type import QIIME2Type
# Sentinel to avoid the situation where `None` *is* the default value.
NoDefault = {}
class GeneratedOption(click.Option):
def __init__(self, *, prefix, name, repr, ast, multiple, is_bool_flag,
metadata, metavar, default=NoDefault, description=None,
**attrs):
import q2cli.util
if metadata is not None:
prefix = 'm'
if multiple is not None:
multiple = list if multiple == 'list' else set
if is_bool_flag:
yes = q2cli.util.to_cli_name(name)
no = q2cli.util.to_cli_name('no_' + name)
opt = f'--{prefix}-{yes}/--{prefix}-{no}'
elif metadata is not None:
cli_name = q2cli.util.to_cli_name(name)
opt = f'--{prefix}-{cli_name}-file'
if metadata == 'column':
self.q2_extra_dest, self.q2_extra_opts, _ = \
self._parse_decls([f'--{prefix}-{cli_name}-column'], True)
else:
cli_name = q2cli.util.to_cli_name(name)
opt = f'--{prefix}-{cli_name}'
click_type = QIIME2Type(ast, repr, is_output=prefix == 'o')
attrs['metavar'] = metavar
attrs['multiple'] = multiple is not None
attrs['param_decls'] = [opt]
attrs['required'] = default is NoDefault
attrs['help'] = self._add_default(description, default)
if default is not NoDefault:
attrs['default'] = default
# This is to evade clicks __DEBUG__ check
if not is_bool_flag:
attrs['type'] = click_type
else:
attrs['type'] = None
super().__init__(**attrs)
# put things back the way they _should_ be after evading __DEBUG__
self.is_bool_flag = is_bool_flag
self.type = click_type
# attrs we will use elsewhere
self.q2_multiple = multiple
self.q2_prefix = prefix
self.q2_name = name
self.q2_ast = ast
self.q2_metadata = metadata
@property
def meta_help(self):
if self.q2_metadata == 'file':
return 'multiple arguments will be merged'
def _add_default(self, desc, default):
if desc is not None:
desc += ' '
else:
desc = ''
if default is not NoDefault:
if default is None:
desc += '[optional]'
else:
desc += '[default: %r]' % (default,)
return desc
def consume_value(self, ctx, opts):
if self.q2_metadata == 'column':
return self._consume_metadata(ctx, opts)
else:
return super().consume_value(ctx, opts)
def _consume_metadata(self, ctx, opts):
# double consume
md_file = super().consume_value(ctx, opts)
# consume uses self.name, so mutate but backup for after
backup, self.name = self.name, self.q2_extra_dest
md_col = super().consume_value(ctx, opts)
self.name = backup
if (md_col is None) != (md_file is None):
# missing one or the other
if md_file is None:
raise click.MissingParameter(ctx=ctx, param=self)
else:
raise click.MissingParameter(param_hint=self.q2_extra_opts,
ctx=ctx, param=self)
if md_col is None and md_file is None:
return None
else:
return (md_file, md_col)
def get_help_record(self, ctx):
record = super().get_help_record(ctx)
if self.is_bool_flag:
metavar = self.make_metavar()
if metavar:
record = (record[0] + ' ' + self.make_metavar(), record[1])
elif self.q2_metadata == 'column':
opts = (record[0], self.q2_extra_opts[0] + ' COLUMN ')
record = (opts, record[1])
return record
# Override
def add_to_parser(self, parser, ctx):
shared = dict(dest=self.name, nargs=0, obj=self)
if self.q2_metadata == 'column':
parser.add_option(self.opts, action='store', dest=self.name,
nargs=1, obj=self)
parser.add_option(self.q2_extra_opts, action='store',
dest=self.q2_extra_dest, nargs=1, obj=self)
elif self.is_bool_flag:
if self.multiple:
action = 'append_maybe'
else:
action = 'store_maybe'
parser.add_option(self.opts, action=action, const=True,
**shared)
parser.add_option(self.secondary_opts, action=action,
const=False, **shared)
elif self.multiple:
action = 'append_greedy'
parser.add_option(self.opts, action='append_greedy', **shared)
else:
super().add_to_parser(parser, ctx)
def full_process_value(self, ctx, value):
try:
return super().full_process_value(ctx, value)
except click.MissingParameter:
if not (self.q2_prefix == 'o'
and ctx.params.get('output_dir', False)):
raise
def type_cast_value(self, ctx, value):
import sys
import q2cli.util
import qiime2.sdk.util
if self.multiple:
if value == () or value is None:
return None
elif self.q2_prefix == 'i':
value = super().type_cast_value(ctx, value)
if self.q2_multiple is set:
self._check_length(value, ctx)
value = self.q2_multiple(value)
type_expr = qiime2.sdk.util.type_from_ast(self.q2_ast)
args = ', '.join(map(repr, (x.type for x in value)))
if value not in type_expr:
raise click.BadParameter(
'recieved <%s> as an argument, which is incompatible'
' with parameter type: %r' % (args, type_expr),
ctx=ctx, param=self)
return value
elif self.q2_metadata == 'file':
value = super().type_cast_value(ctx, value)
if len(value) == 1:
return value[0]
else:
try:
return value[0].merge(*value[1:])
except Exception as e:
header = ("There was an issue with merging "
"QIIME 2 Metadata:")
tb = 'stderr' if '--verbose' in sys.argv else None
q2cli.util.exit_with_error(
e, header=header, traceback=tb)
elif self.q2_prefix == 'p':
try:
if self.q2_multiple is set:
self._check_length(value, ctx)
value = qiime2.sdk.util.parse_primitive(self.q2_ast, value)
except ValueError:
args = ', '.join(map(repr, value))
expr = qiime2.sdk.util.type_from_ast(self.q2_ast)
raise click.BadParameter(
'recieved <%s> as an argument, which is incompatible'
' with parameter type: %r' % (args, expr),
ctx=ctx, param=self)
return value
return super().type_cast_value(ctx, value)
def _check_length(self, value, ctx):
import collections
counter = collections.Counter(value)
dups = ', '.join(map(repr, (v for v, n in counter.items() if n > 1)))
args = ', '.join(map(repr, value))
if dups:
raise click.BadParameter(
'recieved <%s> as an argument, which contains duplicates'
' of the following: <%s>' % (args, dups), ctx=ctx, param=self)
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2019, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Some of the source code in this file is derived from original work:
#
# Copyright (c) 2014 by the Pallets team.
#
# To see the license for the original work, see licenses/click.LICENSE.rst
# Specific reproduction and derivation of original work is marked below.
# ----------------------------------------------------------------------------
import click.parser as parser
import click.exceptions as exceptions
class Q2Option(parser.Option):
@property
def takes_value(self):
# store_maybe should take a value so that we hit the right branch
# in OptionParser._match_long_opt
return (super().takes_value or self.action == 'store_maybe'
or self.action == 'append_greedy')
def _maybe_take(self, state):
if not state.rargs:
return None
# In a more perfect world, we would have access to all long opts
# and could verify against those instead of just the prefix '--'
if state.rargs[0].startswith('--'):
return None
return state.rargs.pop(0)
# Specific technique derived from original:
# < https://github.com/pallets/click/blob/
# c6042bf2607c5be22b1efef2e42a94ffd281434c/click/core.py#L867 >
# Copyright (c) 2014 by the Pallets team.
def process(self, value, state):
# actions should update state.opts and state.order
if (self.dest in state.opts
and self.action not in ('append', 'append_const',
'append_maybe', 'append_greedy',
'count')):
raise exceptions.UsageError(
'Option %r was specified multiple times in the command.'
% self._get_opt_name())
elif self.action == 'store_maybe':
assert value == ()
value = self._maybe_take(state)
if value is None:
state.opts[self.dest] = self.const
else:
state.opts[self.dest] = value
state.order.append(self.obj) # can't forget this
elif self.action == 'append_maybe':
assert value == ()
value = self._maybe_take(state)
if value is None:
state.opts.setdefault(self.dest, []).append(self.const)
else:
while value is not None:
state.opts.setdefault(self.dest, []).append(value)
value = self._maybe_take(state)
state.order.append(self.obj) # can't forget this
elif self.action == 'append_greedy':
assert value == ()
value = self._maybe_take(state)
while value is not None:
state.opts.setdefault(self.dest, []).append(value)
value = self._maybe_take(state)
state.order.append(self.obj) # can't forget this
elif self.takes_value and value.startswith('--'):
# Error early instead of cascading the parse error to a "missing"
# parameter, which they ironically did provide
raise parser.BadOptionUsage(
self, '%s option requires an argument' % self._get_opt_name())
else:
super().process(value, state)
def _get_opt_name(self):
if hasattr(self.obj, 'secondary_opts'):
return ' / '.join(self.obj.opts + self.obj.secondary_opts)
if hasattr(self.obj, 'get_error_hint'):
return self.obj.get_error_hint(None)
return ' / '.join(self._long_opts)
class Q2Parser(parser.OptionParser):
# Modified from original:
# < https://github.com/pallets/click/blob/
# ic6042bf2607c5be22b1efef2e42a94ffd281434c/click/parser.py#L228 >
# Copyright (c) 2014 by the Pallets team.
def add_option(self, opts, dest, action=None, nargs=1, const=None,
obj=None):
"""Adds a new option named `dest` to the parser. The destination
is not inferred (unlike with optparse) and needs to be explicitly
provided. Action can be any of ``store``, ``store_const``,
``append``, ``appnd_const`` or ``count``.
The `obj` can be used to identify the option in the order list
that is returned from the parser.
"""
if obj is None:
obj = dest
opts = [parser.normalize_opt(opt, self.ctx) for opt in opts]
# BEGIN MODIFICATIONS
if action == 'store_maybe' or action == 'append_maybe':
# Specifically target this branch:
# < https://github.com/pallets/click/blob/
# c6042bf2607c5be22b1efef2e42a94ffd281434c/click/parser.py#L341 >
# this happens to prevents click from reading any arguments itself
# because it will only "pop" off rargs[:0], which is nothing
nargs = 0
if const is None:
raise ValueError("A 'const' must be provided when action is "
"'store_maybe' or 'append_maybe'")
elif action == 'append_greedy':
nargs = 0
option = Q2Option(opts, dest, action=action, nargs=nargs,
const=const, obj=obj)
# END MODIFICATIONS
self._opt_prefixes.update(option.prefixes)
for opt in option._short_opts:
self._short_opt[opt] = option
for opt in option._long_opts:
self._long_opt[opt] = option
def parse_args(self, args):
backup = args.copy() # args will be mutated by super()
try:
return super().parse_args(args)
except exceptions.UsageError:
if '--help' in backup:
# all is forgiven
return {'help': True}, [], ['help']
raise
# Override of private member:
# < https://github.com/pallets/click/blob/
# ic6042bf2607c5be22b1efef2e42a94ffd281434c/click/parser.py#L321 >
def _match_long_opt(self, opt, explicit_value, state):
if opt not in self._long_opt:
from q2cli.util import get_close_matches
# This is way better than substring matching
possibilities = get_close_matches(opt, self._long_opt)
raise exceptions.NoSuchOption(opt, possibilities=possibilities,
ctx=self.ctx)
return super()._match_long_opt(opt, explicit_value, state)
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2019, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
import click
def is_writable_dir(path):
import os
head = 'do-while'
path = os.path.normpath(os.path.abspath(path))
while head:
if os.path.exists(path):
if os.path.isfile(path):
return False
else:
return os.access(path, os.W_OK | os.X_OK)
path, head = os.path.split(path)
return False
class OutDirType(click.Path):
def convert(self, value, param, ctx):
import os
# Click path fails to validate writability on new paths
if os.path.exists(value):
if os.path.isfile(value):
self.fail('%r is already a file.' % (value,), param, ctx)
else:
self.fail('%r already exists, will not overwrite.' % (value,),
param, ctx)
if value[-1] != os.path.sep:
value += os.path.sep
if not is_writable_dir(value):
self.fail('%r is not a writable directory, cannot write output'
' to it.' % (value,), param, ctx)
return value
class ControlFlowException(Exception):
pass
class QIIME2Type(click.ParamType):
def __init__(self, type_ast, type_repr, is_output=False):
self.type_repr = type_repr
self.type_ast = type_ast
self.is_output = is_output
self._type_expr = None
@property
def type_expr(self):
import qiime2.sdk.util
if self._type_expr is None:
self._type_expr = qiime2.sdk.util.type_from_ast(self.type_ast)
return self._type_expr
def convert(self, value, param, ctx):
import qiime2.sdk.util
if value is None:
return None # Them's the rules
if self.is_output:
return self._convert_output(value, param, ctx)
if qiime2.sdk.util.is_semantic_type(self.type_expr):
return self._convert_input(value, param, ctx)
if qiime2.sdk.util.is_metadata_type(self.type_expr):
return self._convert_metadata(value, param, ctx)
return self._convert_primitive(value, param, ctx)
def _convert_output(self, value, param, ctx):
import os
# Click path fails to validate writability on new paths
if os.path.exists(value):
if os.path.isdir(value):
self.fail('%r is already a directory.' % (value,), param, ctx)
directory = os.path.dirname(value)
if not is_writable_dir(directory):
self.fail('%r is not a writable directory, cannot write output'
' to it.' % (directory,), param, ctx)
return value
def _convert_input(self, value, param, ctx):
import os
import tempfile
import qiime2.sdk
import qiime2.sdk.util
try:
try:
result = qiime2.sdk.Result.load(value)
except OSError as e:
if e.errno == 28:
temp = tempfile.tempdir
self.fail(f'There was not enough space left on {temp!r} '
f'to extract the artifact {value!r}. '
'(Try setting $TMPDIR to a directory with '
'more space, or increasing the size of '
f'{temp!r})', param, ctx)
else:
raise ControlFlowException
except ValueError as e:
if 'does not exist' in str(e):
self.fail(f'{value!r} is not a valid filepath', param, ctx)
else:
raise ControlFlowException
except Exception:
raise ControlFlowException
except ControlFlowException:
self.fail('%r is not a QIIME 2 Artifact (.qza)' % value, param,
ctx)
if isinstance(result, qiime2.sdk.Visualization):
maybe = value[:-1] + 'a'
hint = ''
if os.path.exists(maybe):
hint = (' (There is an artifact with the same name:'
' %r, did you mean that?)'
% os.path.basename(maybe))
self.fail('%r is a QIIME 2 visualization (.qzv), not an '
' Artifact (.qza)%s' % (value, hint), param, ctx)
style = qiime2.sdk.util.interrogate_collection_type(self.type_expr)
if style.style is None and result not in self.type_expr:
# collections need to be handled above this
self.fail("Expected an artifact of at least type %r."
" An artifact of type %r was provided."
% (self.type_expr, result.type), param, ctx)
return result
def _convert_metadata(self, value, param, ctx):
import sys
import qiime2
import q2cli.util
if self.type_expr.name == 'MetadataColumn':
value, column = value
fp = value
try:
artifact = qiime2.Artifact.load(fp)
except Exception:
try:
metadata = qiime2.Metadata.load(fp)
except Exception as e:
header = ("There was an issue with loading the file %s as "
"metadata:" % fp)
tb = 'stderr' if '--verbose' in sys.argv else None
q2cli.util.exit_with_error(e, header=header, traceback=tb)
else:
try:
metadata = artifact.view(qiime2.Metadata)
except Exception as e:
header = ("There was an issue with viewing the artifact "
"%s as QIIME 2 Metadata:" % fp)
tb = 'stderr' if '--verbose' in sys.argv else None
q2cli.util.exit_with_error(e, header=header, traceback=tb)
if self.type_expr.name != 'MetadataColumn':
return metadata
else:
try:
metadata_column = metadata.get_column(column)
except Exception:
self.fail("There was an issue with retrieving column %r from "
"the metadata:" % column)
if metadata_column not in self.type_expr:
self.fail("Metadata column is of type %r, but expected %r."
% (metadata_column.type, self.type_expr.fields[0]))
return metadata_column
def _convert_primitive(self, value, param, ctx):
import qiime2.sdk.util
return qiime2.sdk.util.parse_primitive(self.type_expr, value)
@property
def name(self):
return self.get_metavar('')
def get_type_repr(self, param):
return self.type_repr
def get_missing_message(self, param):
if self.is_output:
return '("--output-dir" may also be used)'
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2019, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
import click
import q2cli.builtin.dev
import q2cli.builtin.info
import q2cli.builtin.tools
from q2cli.core.config import CONFIG
from q2cli.click.command import BaseCommandMixin
class RootCommand(BaseCommandMixin, click.MultiCommand):
"""This class defers to either the PluginCommand or the builtin cmds"""
_builtin_commands = {
'info': q2cli.builtin.info.info,
'tools': q2cli.builtin.tools.tools,
'dev': q2cli.builtin.dev.dev
}
def __init__(self, *args, **kwargs):
import re
import sys
unicodes = ["\u2018", "\u2019", "\u201C", "\u201D", "\u2014"]
category_regex = re.compile(r'--m-(\S+)-category')
invalid_chars = []
categories = []
for command in sys.argv:
if any(x in command for x in unicodes):
invalid_chars.append(command)
match = category_regex.fullmatch(command)
if match is not None:
param_name, = match.groups()
# Maps old-style option name to new name.
categories.append((command, '--m-%s-column' % param_name))
if invalid_chars or categories:
if invalid_chars:
click.secho("Error: Detected invalid character in: %s\n"
"Verify the correct quotes or dashes (ASCII) are "
"being used." % ', '.join(invalid_chars),
err=True, fg='red', bold=True)
if categories:
old_to_new_names = '\n'.join(
'Instead of %s, trying using %s' % (old, new)
for old, new in categories)
msg = ("Error: The following options no longer exist because "
"metadata *categories* are now called metadata "
"*columns* in QIIME 2.\n\n%s" % old_to_new_names)
click.secho(msg, err=True, fg='red', bold=True)
sys.exit(-1)
super().__init__(*args, **kwargs)
# Plugin state for current deployment that will be loaded from cache.
# Used to construct the dynamic CLI.
self._plugins = None
@property
def _plugin_lookup(self):
import q2cli.util
# See note in `q2cli.completion.write_bash_completion_script` for why
# `self._plugins` will not always be obtained from
# `q2cli.cache.CACHE.plugins`.
if self._plugins is None:
import q2cli.core.cache
self._plugins = q2cli.core.cache.CACHE.plugins
name_map = {}
for name, plugin in self._plugins.items():
if plugin['actions']:
name_map[q2cli.util.to_cli_name(name)] = plugin
return name_map
def list_commands(self, ctx):
import itertools
# Avoid sorting builtin commands as they have a predefined order based
# on applicability to users. For example, it isn't desirable to have
# the `dev` command listed before `info` and `tools`.
builtins = self._builtin_commands
plugins = sorted(self._plugin_lookup)
return itertools.chain(builtins, plugins)
def get_command(self, ctx, name):
if name in self._builtin_commands:
return self._builtin_commands[name]
try:
plugin = self._plugin_lookup[name]
except KeyError:
from q2cli.util import get_close_matches
possibilities = get_close_matches(name, self._plugin_lookup)
if len(possibilities) == 1:
hint = ' Did you mean %r?' % possibilities[0]
elif possibilities:
hint = ' (Possible commands: %s)' % ', '.join(possibilities)
else:
hint = ''
click.secho("Error: QIIME 2 has no plugin/command named %r."
% name + hint,
err=True, fg='red')
ctx.exit(2) # Match exit code of `return None`
return PluginCommand(plugin, name)
class PluginCommand(BaseCommandMixin, click.MultiCommand):
"""Provides ActionCommands based on available Actions"""
def __init__(self, plugin, name, *args, **kwargs):
import q2cli.util
# the cli currently doesn't differentiate between methods
# and visualizers, it treats them generically as Actions
self._plugin = plugin
self._action_lookup = {q2cli.util.to_cli_name(id): a for id, a in
plugin['actions'].items()}
support = 'Getting user support: %s' % plugin['user_support_text']
website = 'Plugin website: %s' % plugin['website']
description = 'Description: %s' % plugin['description']
help_ = '\n\n'.join([description, website, support])
params = [
click.Option(('--version',), is_flag=True, expose_value=False,
is_eager=True, callback=self._get_version,
help='Show the version and exit.'),
q2cli.util.citations_option(self._get_citation_records)
]
super().__init__(name, *args, short_help=plugin['short_description'],
help=help_, params=params, **kwargs)
def _get_version(self, ctx, param, value):
if not value or ctx.resilient_parsing:
return
import qiime2.sdk
for entrypoint in qiime2.sdk.PluginManager.iter_entry_points():
plugin = entrypoint.load()
if (self._plugin['name'] == plugin.name):
pkg_name = entrypoint.dist.project_name
pkg_version = entrypoint.dist.version
break
else:
pkg_name = pkg_version = "[UNKNOWN]"
click.echo(
"QIIME 2 Plugin '%s' version %s (from package '%s' version %s)"
% (self._plugin['name'], self._plugin['version'],
pkg_name, pkg_version)
)
ctx.exit()
def _get_citation_records(self):
import qiime2.sdk
pm = qiime2.sdk.PluginManager()
return pm.plugins[self._plugin['name']].citations
def list_commands(self, ctx):
return sorted(self._action_lookup)
def get_command(self, ctx, name):
try:
action = self._action_lookup[name]
except KeyError:
from q2cli.util import get_close_matches
possibilities = get_close_matches(name, self._action_lookup)
if len(possibilities) == 1:
hint = ' Did you mean %r?' % possibilities[0]
elif possibilities:
hint = ' (Possible commands: %s)' % ', '.join(possibilities)
else:
hint = ''
click.secho("Error: QIIME 2 plugin %r has no action %r."
% (self._plugin['name'], name) + hint,
err=True, fg='red')
ctx.exit(2) # Match exit code of `return None`
return ActionCommand(name, self._plugin, action)
class ActionCommand(BaseCommandMixin, click.Command):
"""A click manifestation of a QIIME 2 API Action (Method/Visualizer)
"""
def __init__(self, name, plugin, action):
import q2cli.util
import q2cli.click.type
self.plugin = plugin
self.action = action
self._inputs, self._params, self._outputs = \
self._build_generated_options()
self._misc = [
click.Option(['--output-dir'],
type=q2cli.click.type.OutDirType(),
help='Output unspecified results to a directory'),
click.Option(['--verbose / --quiet'], default=None, required=False,
help='Display verbose output to stdout and/or stderr '
'during execution of this action. Or silence '
'output if execution is successful (silence is '
'golden).'),
q2cli.util.citations_option(self._get_citation_records)
]
options = [*self._inputs, *self._params, *self._outputs, *self._misc]
help_ = [action['description']]
if self._get_action().deprecated:
help_.append(CONFIG.cfg_style(
'warning', 'WARNING:\n\nThis command is deprecated and will '
'be removed in a future version of this plugin.'))
super().__init__(name, params=options, callback=self,
short_help=action['name'], help='\n\n'.join(help_))
def _build_generated_options(self):
import q2cli.click.option
inputs = []
params = []
outputs = []
for item in self.action['signature']:
item = item.copy()
type = item.pop('type')
if type == 'input':
storage = inputs
elif type == 'parameter':
storage = params
else:
storage = outputs
opt = q2cli.click.option.GeneratedOption(prefix=type[0], **item)
storage.append(opt)
return inputs, params, outputs
def get_opt_groups(self, ctx):
return {
'Inputs': self._inputs,
'Parameters': self._params,
'Outputs': self._outputs,
'Miscellaneous': self._misc + [self.get_help_option(ctx)]
}
def _get_citation_records(self):
return self._get_action().citations
def _get_action(self):
import qiime2.sdk
pm = qiime2.sdk.PluginManager()
plugin = pm.plugins[self.plugin['name']]
return plugin.actions[self.action['id']]
def __call__(self, **kwargs):
"""Called when user hits return, **kwargs are Dict[click_names, Obj]"""
import os
import qiime2.util
output_dir = kwargs.pop('output_dir')
verbose = kwargs.pop('verbose')
if verbose is None:
verbose = False
quiet = False
elif verbose:
quiet = False
else:
quiet = True
arguments = {}
init_outputs = {}
for key, value in kwargs.items():
prefix, *parts = key.split('_')
key = '_'.join(parts)
if prefix == 'o':
if value is None:
value = os.path.join(output_dir, key)
init_outputs[key] = value
elif prefix == 'm':
arguments[key[:-len('_file')]] = value
else:
arguments[key] = value
outputs = self._order_outputs(init_outputs)
action = self._get_action()
# `qiime2.util.redirected_stdio` defaults to stdout/stderr when
# supplied `None`.
log = None
if not verbose:
import tempfile
log = tempfile.NamedTemporaryFile(prefix='qiime2-q2cli-err-',
suffix='.log',
delete=False, mode='w')
if action.deprecated:
# We don't need to worry about redirecting this, since it should a)
# always be shown to the user and b) the framework-originated
# FutureWarning will wind up in the log file in quiet mode.
msg = ('Plugin warning from %s:\n\n%s is deprecated and '
'will be removed in a future version of this plugin.' %
(q2cli.util.to_cli_name(self.plugin['name']), self.name))
click.echo(CONFIG.cfg_style('warning', msg))
cleanup_logfile = False
try:
with qiime2.util.redirected_stdio(stdout=log, stderr=log):
results = action(**arguments)
except Exception as e:
header = ('Plugin error from %s:'
% q2cli.util.to_cli_name(self.plugin['name']))
if verbose:
# log is not a file
log = 'stderr'
q2cli.util.exit_with_error(e, header=header, traceback=log)
else:
cleanup_logfile = True
finally:
# OS X will reap temporary files that haven't been touched in
# 36 hours, double check that the log is still on the filesystem
# before trying to delete. Otherwise this will fail and the
# output won't be written.
if log and cleanup_logfile and os.path.exists(log.name):
log.close()
os.remove(log.name)
if output_dir is not None:
os.makedirs(output_dir)
for result, output in zip(results, outputs):
path = result.save(output)
if not quiet:
click.secho('Saved %s to: %s' % (result.type, path),
fg='green')
def _order_outputs(self, outputs):
ordered = []
for item in self.action['signature']:
if item['type'] == 'output':
ordered.append(outputs[item['name']])
return ordered