Skip to content
Commits on Source (3)
......@@ -59,6 +59,9 @@ Sorted alphabetically by repo name.
- The q2-diversity plugin
https://github.com/qiime2/q2-diversity/issues
- The q2-diversity-lib plugin
https://github.com/qiime2/q2-diversity-lib/issues
- The q2-emperor plugin
https://github.com/qiime2/q2-emperor/issues
......
......@@ -52,6 +52,8 @@ Sorted alphabetically by repo name.
| The q2-demux plugin
- [q2-diversity](https://github.com/qiime2/q2-diversity/issues)
| The q2-diversity plugin
- [q2-diversity-lib](https://github.com/qiime2/q2-diversity-lib/issues)
| The q2-diversity-lib plugin
- [q2-emperor](https://github.com/qiime2/q2-emperor/issues)
| The q2-emperor plugin
- [q2-feature-classifier](https://github.com/qiime2/q2-feature-classifier/issues)
......
q2cli (2019.4.0-1) UNRELEASED; urgency=medium
* New upstream version
-- Liubov Chuprikova <chuprikovalv@gmail.com> Thu, 13 Jun 2019 19:22:21 +0200
q2cli (2019.1.0-1) unstable; urgency=medium
* Initial release (Closes: #925400)
......
......@@ -6,9 +6,7 @@
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
from .core import Option, option
from ._version import get_versions
__all__ = ['Option', 'option']
__version__ = get_versions()['version']
del get_versions
......@@ -23,9 +23,9 @@ def get_keywords():
# setup.py/versioneer.py will grep for the variable names, so they must
# each be defined on a line of their own. _version.py will just call
# get_keywords().
git_refnames = " (tag: 2019.1.0)"
git_full = "bd4936307955a839c754dae505722e53328a124a"
git_date = "2019-01-29 14:00:34 +0000"
git_refnames = " (tag: 2019.4.0)"
git_full = "dc80fad32777035091692ce1083088380a6ac509"
git_date = "2019-05-03 04:14:45 +0000"
keywords = {"refnames": git_refnames, "full": git_full, "date": git_date}
return keywords
......
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2019, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
......@@ -7,9 +7,11 @@
# ----------------------------------------------------------------------------
import click
from q2cli.click.command import ToolCommand, ToolGroupCommand
@click.group(help='Utilities for developers and advanced users.')
@click.group(help='Utilities for developers and advanced users.',
cls=ToolGroupCommand)
def dev():
pass
......@@ -22,7 +24,8 @@ def dev():
"is necessary because package versions do not typically "
"change each time an update is made to a package's code. "
"Setting the environment variable Q2CLIDEV to any value "
"will always refresh the cache when a command is run.")
"will always refresh the cache when a command is run.",
cls=ToolCommand)
def refresh_cache():
import q2cli.cache
q2cli.cache.CACHE.refresh()
import q2cli.core.cache
q2cli.core.cache.CACHE.refresh()
......@@ -8,7 +8,7 @@
import click
import q2cli
from q2cli.click.command import ToolCommand
def _echo_version():
......@@ -25,9 +25,9 @@ def _echo_version():
def _echo_plugins():
import q2cli.cache
import q2cli.core.cache
plugins = q2cli.cache.CACHE.plugins
plugins = q2cli.core.cache.CACHE.plugins
if plugins:
for name, plugin in sorted(plugins.items()):
click.echo('%s: %s' % (name, plugin['version']))
......@@ -36,32 +36,17 @@ def _echo_plugins():
'the official QIIME 2 plugins at https://qiime2.org')
def _echo_installed_packages():
import pip
# This code was derived from an example provide here:
# http://stackoverflow.com/a/23885252/3424666
installed_packages = sorted(["%s==%s" % (i.key, i.version)
for i in pip.get_installed_distributions()])
for e in installed_packages:
click.echo(e)
@click.command(help='Display information about current deployment.')
@q2cli.option('--py-packages', is_flag=True,
help='Display names and versions of all installed Python '
'packages.')
def info(py_packages):
@click.command(help='Display information about current deployment.',
cls=ToolCommand)
def info():
import q2cli.util
import q2cli.cache
# This import improves performance for repeated _echo_plugins
import q2cli.core.cache
click.secho('System versions', fg='green')
_echo_version()
click.secho('\nInstalled plugins', fg='green')
_echo_plugins()
if py_packages:
click.secho('\nInstalled Python packages', fg='green')
_echo_installed_packages()
click.secho('\nApplication config directory', fg='green')
click.secho(q2cli.util.get_app_dir())
......
......@@ -10,10 +10,15 @@ import os
import click
import q2cli
import q2cli.util
from q2cli.click.command import ToolCommand, ToolGroupCommand
@click.group(help='Tools for working with QIIME 2 files.')
_COMBO_METAVAR = 'ARTIFACT/VISUALIZATION'
@click.group(help='Tools for working with QIIME 2 files.',
cls=ToolGroupCommand)
def tools():
pass
......@@ -23,18 +28,18 @@ def tools():
'or a Visualization',
help='Exporting extracts (and optionally transforms) data '
'stored inside an Artifact or Visualization. Note that '
'Visualizations cannot be transformed with --output-format'
)
@q2cli.option('--input-path', required=True,
'Visualizations cannot be transformed with --output-format',
cls=ToolCommand)
@click.option('--input-path', required=True, metavar=_COMBO_METAVAR,
type=click.Path(exists=True, file_okay=True,
dir_okay=False, readable=True),
help='Path to file that should be exported')
@q2cli.option('--output-path', required=True,
@click.option('--output-path', required=True,
type=click.Path(exists=False, file_okay=True, dir_okay=True,
writable=True),
help='Path to file or directory where '
'data should be exported to')
@q2cli.option('--output-format', required=False,
@click.option('--output-format', required=False,
help='Format which the data should be exported as. '
'This option cannot be used with Visualizations')
def export_data(input_path, output_path, output_format):
......@@ -112,29 +117,30 @@ def show_importable_formats(ctx, param, value):
help="Import data to create a new QIIME 2 Artifact. See "
"https://docs.qiime2.org/ for usage examples and details "
"on the file types and associated semantic types that can "
"be imported.")
@q2cli.option('--type', required=True,
"be imported.",
cls=ToolCommand)
@click.option('--type', required=True,
help='The semantic type of the artifact that will be created '
'upon importing. Use --show-importable-types to see what '
'importable semantic types are available in the current '
'deployment.')
@q2cli.option('--input-path', required=True,
@click.option('--input-path', required=True,
type=click.Path(exists=True, file_okay=True, dir_okay=True,
readable=True),
help='Path to file or directory that should be imported.')
@q2cli.option('--output-path', required=True,
@click.option('--output-path', required=True, metavar='ARTIFACT',
type=click.Path(exists=False, file_okay=True, dir_okay=False,
writable=True),
help='Path where output artifact should be written.')
@q2cli.option('--input-format', required=False,
@click.option('--input-format', required=False,
help='The format of the data to be imported. If not provided, '
'data must be in the format expected by the semantic type '
'provided via --type.')
@q2cli.option('--show-importable-types', is_flag=True, is_eager=True,
@click.option('--show-importable-types', is_flag=True, is_eager=True,
callback=show_importable_types, expose_value=False,
help='Show the semantic types that can be supplied to --type '
'to import data into an artifact.')
@q2cli.option('--show-importable-formats', is_flag=True, is_eager=True,
@click.option('--show-importable-formats', is_flag=True, is_eager=True,
callback=show_importable_formats, expose_value=False,
help='Show formats that can be supplied to --input-format to '
'import data into an artifact.')
......@@ -163,9 +169,11 @@ def import_data(type, input_path, output_path, input_format):
@tools.command(short_help='Take a peek at a QIIME 2 Artifact or '
'Visualization.',
help="Display basic information about a QIIME 2 Artifact or "
"Visualization, including its UUID and type.")
"Visualization, including its UUID and type.",
cls=ToolCommand)
@click.argument('path', type=click.Path(exists=True, file_okay=True,
dir_okay=False, readable=True))
dir_okay=False, readable=True),
metavar=_COMBO_METAVAR)
def peek(path):
import qiime2.sdk
......@@ -184,10 +192,11 @@ def peek(path):
short_help='Inspect columns available in metadata.',
help='Inspect metadata files or artifacts viewable as metadata.'
' Providing multiple file paths to this command will merge'
' the metadata.')
@q2cli.option('--tsv/--no-tsv', default=False,
' the metadata.',
cls=ToolCommand)
@click.option('--tsv/--no-tsv', default=False,
help='Print as machine-readable TSV instead of text.')
@click.argument('paths', nargs=-1, required=True,
@click.argument('paths', nargs=-1, required=True, metavar='METADATA...',
type=click.Path(exists=True, file_okay=True, dir_okay=False,
readable=True))
@q2cli.util.pretty_failure(traceback=None)
......@@ -265,11 +274,12 @@ def _load_metadata(path):
@tools.command(short_help='View a QIIME 2 Visualization.',
help="Displays a QIIME 2 Visualization until the command "
"exits. To open a QIIME 2 Visualization so it can be "
"used after the command exits, use 'qiime extract'.")
@click.argument('visualization-path',
"used after the command exits, use 'qiime extract'.",
cls=ToolCommand)
@click.argument('visualization-path', metavar='VISUALIZATION',
type=click.Path(exists=True, file_okay=True, dir_okay=False,
readable=True))
@q2cli.option('--index-extension', required=False, default='html',
@click.option('--index-extension', required=False, default='html',
help='The extension of the index file that should be opened. '
'[default: html]')
def view(visualization_path, index_extension):
......@@ -337,12 +347,13 @@ def view(visualization_path, index_extension):
"Visualization's archive, including provenance, metadata, "
"and actual data. Use 'qiime tools export' to export only "
"the data stored in an Artifact or Visualization, with "
"the choice of exporting to different formats.")
@q2cli.option('--input-path', required=True,
"the choice of exporting to different formats.",
cls=ToolCommand)
@click.option('--input-path', required=True, metavar=_COMBO_METAVAR,
type=click.Path(exists=True, file_okay=True, dir_okay=False,
readable=True),
help='Path to file that should be extracted')
@q2cli.option('--output-path', required=False,
@click.option('--output-path', required=False,
type=click.Path(exists=False, file_okay=False, dir_okay=True,
writable=True),
help='Directory where archive should be extracted to '
......@@ -370,10 +381,12 @@ def extract(input_path, output_path):
'and/or more thorough validation of your data (e.g. when '
'debugging issues with your data or analyses).\n\nNote: '
'validation can take some time to complete, depending on '
'the size and type of your data.')
'the size and type of your data.',
cls=ToolCommand)
@click.argument('path', type=click.Path(exists=True, file_okay=True,
dir_okay=False, readable=True))
@q2cli.option('--level', required=False, type=click.Choice(['min', 'max']),
dir_okay=False, readable=True),
metavar=_COMBO_METAVAR)
@click.option('--level', required=False, type=click.Choice(['min', 'max']),
help='Desired level of validation. "min" will perform minimal '
'validation, and "max" will perform maximal validation (at '
'the potential cost of runtime).',
......@@ -404,9 +417,11 @@ def validate(path, level):
@tools.command(short_help='Print citations for a QIIME 2 result.',
help='Print citations as a BibTex file (.bib) for a QIIME 2'
' result.')
' result.',
cls=ToolCommand)
@click.argument('path', type=click.Path(exists=True, file_okay=True,
dir_okay=False, readable=True))
dir_okay=False, readable=True),
metavar=_COMBO_METAVAR)
def citations(path):
import qiime2.sdk
import io
......
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2019, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2019, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Some of the source code in this file is derived from original work:
#
# Copyright (c) 2014 by the Pallets team.
#
# To see the license for the original work, see licenses/click.LICENSE.rst
# Specific reproduction and derivation of original work is marked below.
# ----------------------------------------------------------------------------
import click
import click.core
class BaseCommandMixin:
# Modified from original:
# < https://github.com/pallets/click/blob/
# c6042bf2607c5be22b1efef2e42a94ffd281434c/click/core.py#L867 >
# Copyright (c) 2014 by the Pallets team.
def make_parser(self, ctx):
"""Creates the underlying option parser for this command."""
from .parser import Q2Parser
parser = Q2Parser(ctx)
for param in self.get_params(ctx):
param.add_to_parser(parser, ctx)
return parser
# Modified from original:
# < https://github.com/pallets/click/blob/
# c6042bf2607c5be22b1efef2e42a94ffd281434c/click/core.py#L934 >
# Copyright (c) 2014 by the Pallets team.
def parse_args(self, ctx, args):
if isinstance(self, click.MultiCommand):
return super().parse_args(ctx, args)
errors = []
parser = self.make_parser(ctx)
skip_rest = False
for _ in range(10): # surely this is enough attempts
try:
opts, args, param_order = parser.parse_args(args=args)
break
except click.ClickException as e:
errors.append(e)
skip_rest = True
if not skip_rest:
for param in click.core.iter_params_for_processing(
param_order, self.get_params(ctx)):
try:
value, args = param.handle_parse_result(ctx, opts, args)
except click.ClickException as e:
errors.append(e)
if args and not ctx.allow_extra_args and not ctx.resilient_parsing:
errors.append(click.UsageError(
'Got unexpected extra argument%s (%s)'
% (len(args) != 1 and 's' or '',
' '.join(map(click.core.make_str, args)))))
if errors:
click.echo(ctx.get_help()+"\n", err=True)
if len(errors) > 1:
problems = 'There were some problems with the command:'
else:
problems = 'There was a problem with the command:'
click.secho(problems.center(78, ' '), fg='yellow', err=True)
for idx, e in enumerate(errors, 1):
msg = click.formatting.wrap_text(
e.format_message(),
initial_indent=' (%d/%d%s) ' % (idx, len(errors),
'?' if skip_rest else ''),
subsequent_indent=' ')
click.secho(msg, err=True, fg='red', bold=True)
ctx.exit(1)
ctx.args = args
return args
def get_option_names(self, ctx):
if not hasattr(self, '__option_names'):
names = set()
for param in self.get_params(ctx):
if hasattr(param, 'q2_name'):
names.add(param.q2_name)
else:
names.add(param.name)
self.__option_names = names
return self.__option_names
def list_commands(self, ctx):
if not hasattr(super(), 'list_commands'):
return []
return super().list_commands(ctx)
def get_opt_groups(self, ctx):
return {'Options': list(self.get_params(ctx))}
def format_help_text(self, ctx, formatter):
super().format_help_text(ctx, formatter)
formatter.write_paragraph()
# Modified from original:
# < https://github.com/pallets/click/blob
# /c6042bf2607c5be22b1efef2e42a94ffd281434c/click/core.py#L830 >
# Copyright (c) 2014 by the Pallets team.
def format_usage(self, ctx, formatter):
"""Writes the usage line into the formatter."""
pieces = self.collect_usage_pieces(ctx)
formatter.write_usage(_style_command(ctx.command_path),
' '.join(pieces))
def format_options(self, ctx, formatter, COL_MAX=23, COL_MIN=10):
# write options
opt_groups = {}
records = []
for group, options in self.get_opt_groups(ctx).items():
opt_records = []
for o in options:
record = o.get_help_record(ctx)
if record is None:
continue
opt_records.append((o, record))
records.append(record)
opt_groups[group] = opt_records
first_columns = (r[0] for r in records)
border = min(COL_MAX, max(COL_MIN, *(len(col) for col in first_columns
if len(col) < COL_MAX)))
for opt_group, opt_records in opt_groups.items():
if not opt_records:
continue
formatter.write_heading(click.style(opt_group, bold=True))
formatter.indent()
padded_border = border + formatter.current_indent
for opt, record in opt_records:
self.write_option(ctx, formatter, opt, record, padded_border)
formatter.dedent()
# Modified from original:
# https://github.com/pallets/click/blob
# /c6042bf2607c5be22b1efef2e42a94ffd281434c/click/core.py#L1056
# Copyright (c) 2014 by the Pallets team.
commands = []
for subcommand in self.list_commands(ctx):
cmd = self.get_command(ctx, subcommand)
# What is this, the tool lied about a command. Ignore it
if cmd is None:
continue
if cmd.hidden:
continue
commands.append((subcommand, cmd))
# allow for 3 times the default spacing
if len(commands):
limit = formatter.width - 6 - max(len(cmd[0]) for cmd in commands)
rows = []
for subcommand, cmd in commands:
help = cmd.get_short_help_str(limit)
rows.append((_style_command(subcommand), help))
if rows:
with formatter.section(click.style('Commands', bold=True)):
formatter.write_dl(rows)
def write_option(self, ctx, formatter, opt, record, border, COL_SPACING=2):
import itertools
full_width = formatter.width - formatter.current_indent
indent_text = ' ' * formatter.current_indent
opt_text, help_text = record
opt_text_secondary = None
if type(opt_text) is tuple:
opt_text, opt_text_secondary = opt_text
help_text, requirements = self._clean_help(help_text)
type_placement = None
type_repr = None
type_indent = 2 * indent_text
if hasattr(opt.type, 'get_type_repr'):
type_repr = opt.type.get_type_repr(opt)
if type_repr is not None:
if len(type_repr) <= border - len(type_indent):
type_placement = 'under'
else:
type_placement = 'beside'
if len(opt_text) > border:
lines = simple_wrap(opt_text, full_width)
else:
lines = [opt_text.split(' ')]
if opt_text_secondary is not None:
lines.append(opt_text_secondary.split(' '))
to_write = []
for tokens in lines:
dangling_edge = formatter.current_indent
styled = []
for token in tokens:
dangling_edge += len(token) + 1
if token.startswith('--'):
token = _style_option(token, required=opt.required)
styled.append(token)
line = indent_text + ' '.join(styled)
to_write.append(line)
formatter.write('\n'.join(to_write))
dangling_edge -= 1
if type_placement == 'beside':
lines = simple_wrap(type_repr, formatter.width - len(type_indent),
start_col=dangling_edge - 1)
to_write = []
first_iter = True
for tokens in lines:
line = ' '.join(tokens)
if first_iter:
dangling_edge += 1 + len(line)
line = " " + _style_type(line)
first_iter = False
else:
dangling_edge = len(type_indent) + len(line)
line = type_indent + _style_type(line)
to_write.append(line)
formatter.write('\n'.join(to_write))
if dangling_edge + 1 > border + COL_SPACING:
formatter.write('\n')
left_col = []
else:
padding = ' ' * (border + COL_SPACING - dangling_edge)
formatter.write(padding)
dangling_edge += len(padding)
left_col = [''] # jagged start
if type_placement == 'under':
padding = ' ' * (border + COL_SPACING
- len(type_repr) - len(type_indent))
line = ''.join([type_indent, _style_type(type_repr), padding])
left_col.append(line)
if hasattr(opt, 'meta_help') and opt.meta_help is not None:
meta_help = simple_wrap(opt.meta_help,
border - len(type_indent) - 1)
for idx, line in enumerate([' '.join(t) for t in meta_help]):
if idx == 0:
line = type_indent + '(' + line
else:
line = type_indent + ' ' + line
if idx == len(meta_help) - 1:
line += ')'
line += ' ' * (border - len(line) + COL_SPACING)
left_col.append(line)
right_col = simple_wrap(help_text,
formatter.width - border - COL_SPACING)
right_col = [' '.join(self._color_important(tokens, ctx))
for tokens in right_col]
to_write = []
for left, right in itertools.zip_longest(
left_col, right_col, fillvalue=' ' * (border + COL_SPACING)):
to_write.append(left)
if right.strip():
to_write[-1] += right
formatter.write('\n'.join(to_write))
if requirements is None:
formatter.write('\n')
else:
if to_write:
if len(to_write) > 1 or ((not left_col) or left_col[0] != ''):
dangling_edge = 0
dangling_edge += click.formatting.term_len(to_write[-1])
else:
pass # dangling_edge is still correct
if dangling_edge + 1 + len(requirements) > formatter.width:
formatter.write('\n')
pad = formatter.width - len(requirements)
else:
pad = formatter.width - len(requirements) - dangling_edge
formatter.write((' ' * pad) + _style_reqs(requirements) + '\n')
def _color_important(self, tokens, ctx):
import re
for t in tokens:
if '_' in t:
names = self.get_option_names(ctx)
if re.sub(r'[^\w]', '', t) in names:
m = re.search(r'(\w+)', t)
word = t[m.start():m.end()]
word = _style_emphasis(word.replace('_', '-'))
token = t[:m.start()] + word + t[m.end():]
yield token
continue
yield t
def _clean_help(self, text):
reqs = ['[required]', '[optional]', '[default: ']
requirement = None
for req in reqs:
if req in text:
requirement = req
break
else:
return text, None
req_idx = text.index(requirement)
return text[:req_idx].strip(), text[req_idx:].strip()
class ToolCommand(BaseCommandMixin, click.Command):
pass
class ToolGroupCommand(BaseCommandMixin, click.Group):
pass
def simple_wrap(text, target, start_col=0):
result = [[]]
current_line = result[0]
current_width = start_col
tokens = []
for token in text.split(' '):
if len(token) <= target:
tokens.append(token)
else:
for i in range(0, len(token), target):
tokens.append(token[i:i+target])
for token in tokens:
token_len = len(token)
if current_width + 1 + token_len > target:
current_line = [token]
result.append(current_line)
current_width = token_len
else:
result[-1].append(token)
current_width += 1 + token_len
return result
def _style_option(text, required=False):
return click.style(text, fg='blue', underline=required)
def _style_type(text):
return click.style(text, fg='green')
def _style_reqs(text):
return click.style(text, fg='magenta')
def _style_command(text):
return _style_option(text)
def _style_emphasis(text):
return click.style(text, underline=True)
Copyright © 2014 by the Pallets team.
Some rights reserved.
Redistribution and use in source and binary forms of the software as
well as documentation, with or without modification, are permitted
provided that the following conditions are met:
- Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
- Neither the name of the copyright holder nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE AND DOCUMENTATION IS PROVIDED BY THE COPYRIGHT HOLDERS AND
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
THIS SOFTWARE AND DOCUMENTATION, EVEN IF ADVISED OF THE POSSIBILITY OF
SUCH DAMAGE.
----
Click uses parts of optparse written by Gregory P. Ward and maintained
by the Python Software Foundation. This is limited to code in parser.py.
Copyright © 2001-2006 Gregory P. Ward. All rights reserved.
Copyright © 2002-2006 Python Software Foundation. All rights reserved.
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2019, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
import click
from .type import QIIME2Type
# Sentinel to avoid the situation where `None` *is* the default value.
NoDefault = {}
class GeneratedOption(click.Option):
def __init__(self, *, prefix, name, repr, ast, multiple, is_bool_flag,
metadata, metavar, default=NoDefault, description=None,
**attrs):
import q2cli.util
if metadata is not None:
prefix = 'm'
if multiple is not None:
multiple = list if multiple == 'list' else set
if is_bool_flag:
yes = q2cli.util.to_cli_name(name)
no = q2cli.util.to_cli_name('no_' + name)
opt = f'--{prefix}-{yes}/--{prefix}-{no}'
elif metadata is not None:
cli_name = q2cli.util.to_cli_name(name)
opt = f'--{prefix}-{cli_name}-file'
if metadata == 'column':
self.q2_extra_dest, self.q2_extra_opts, _ = \
self._parse_decls([f'--{prefix}-{cli_name}-column'], True)
else:
cli_name = q2cli.util.to_cli_name(name)
opt = f'--{prefix}-{cli_name}'
click_type = QIIME2Type(ast, repr, is_output=prefix == 'o')
attrs['metavar'] = metavar
attrs['multiple'] = multiple is not None
attrs['param_decls'] = [opt]
attrs['required'] = default is NoDefault
attrs['help'] = self._add_default(description, default)
if default is not NoDefault:
attrs['default'] = default
# This is to evade clicks __DEBUG__ check
if not is_bool_flag:
attrs['type'] = click_type
else:
attrs['type'] = None
super().__init__(**attrs)
# put things back the way they _should_ be after evading __DEBUG__
self.is_bool_flag = is_bool_flag
self.type = click_type
# attrs we will use elsewhere
self.q2_multiple = multiple
self.q2_prefix = prefix
self.q2_name = name
self.q2_ast = ast
self.q2_metadata = metadata
@property
def meta_help(self):
if self.q2_metadata == 'file':
return 'multiple arguments will be merged'
def _add_default(self, desc, default):
if desc is not None:
desc += ' '
else:
desc = ''
if default is not NoDefault:
if default is None:
desc += '[optional]'
else:
desc += '[default: %r]' % (default,)
return desc
def consume_value(self, ctx, opts):
if self.q2_metadata == 'column':
return self._consume_metadata(ctx, opts)
else:
return super().consume_value(ctx, opts)
def _consume_metadata(self, ctx, opts):
# double consume
md_file = super().consume_value(ctx, opts)
# consume uses self.name, so mutate but backup for after
backup, self.name = self.name, self.q2_extra_dest
md_col = super().consume_value(ctx, opts)
self.name = backup
if (md_col is None) != (md_file is None):
# missing one or the other
if md_file is None:
raise click.MissingParameter(ctx=ctx, param=self)
else:
raise click.MissingParameter(param_hint=self.q2_extra_opts,
ctx=ctx, param=self)
if md_col is None and md_file is None:
return None
else:
return (md_file, md_col)
def get_help_record(self, ctx):
record = super().get_help_record(ctx)
if self.is_bool_flag:
metavar = self.make_metavar()
if metavar:
record = (record[0] + ' ' + self.make_metavar(), record[1])
elif self.q2_metadata == 'column':
opts = (record[0], self.q2_extra_opts[0] + ' COLUMN ')
record = (opts, record[1])
return record
# Override
def add_to_parser(self, parser, ctx):
shared = dict(dest=self.name, nargs=0, obj=self)
if self.q2_metadata == 'column':
parser.add_option(self.opts, action='store', dest=self.name,
nargs=1, obj=self)
parser.add_option(self.q2_extra_opts, action='store',
dest=self.q2_extra_dest, nargs=1, obj=self)
elif self.is_bool_flag:
if self.multiple:
action = 'append_maybe'
else:
action = 'store_maybe'
parser.add_option(self.opts, action=action, const=True,
**shared)
parser.add_option(self.secondary_opts, action=action,
const=False, **shared)
elif self.multiple:
action = 'append_greedy'
parser.add_option(self.opts, action='append_greedy', **shared)
else:
super().add_to_parser(parser, ctx)
def full_process_value(self, ctx, value):
try:
return super().full_process_value(ctx, value)
except click.MissingParameter:
if not (self.q2_prefix == 'o'
and ctx.params.get('output_dir', False)):
raise
def type_cast_value(self, ctx, value):
import sys
import q2cli.util
import qiime2.sdk.util
if self.multiple:
if value == () or value is None:
return None
elif self.q2_prefix == 'i':
value = super().type_cast_value(ctx, value)
if self.q2_multiple is set:
self._check_length(value, ctx)
value = self.q2_multiple(value)
type_expr = qiime2.sdk.util.type_from_ast(self.q2_ast)
args = ', '.join(map(repr, (x.type for x in value)))
if value not in type_expr:
raise click.BadParameter(
'recieved <%s> as an argument, which is incompatible'
' with parameter type: %r' % (args, type_expr),
ctx=ctx, param=self)
return value
elif self.q2_metadata == 'file':
value = super().type_cast_value(ctx, value)
if len(value) == 1:
return value[0]
else:
try:
return value[0].merge(*value[1:])
except Exception as e:
header = ("There was an issue with merging "
"QIIME 2 Metadata:")
tb = 'stderr' if '--verbose' in sys.argv else None
q2cli.util.exit_with_error(
e, header=header, traceback=tb)
elif self.q2_prefix == 'p':
try:
if self.q2_multiple is set:
self._check_length(value, ctx)
value = qiime2.sdk.util.parse_primitive(self.q2_ast, value)
except ValueError:
args = ', '.join(map(repr, value))
expr = qiime2.sdk.util.type_from_ast(self.q2_ast)
raise click.BadParameter(
'recieved <%s> as an argument, which is incompatible'
' with parameter type: %r' % (args, expr),
ctx=ctx, param=self)
return value
return super().type_cast_value(ctx, value)
def _check_length(self, value, ctx):
import collections
counter = collections.Counter(value)
dups = ', '.join(map(repr, (v for v, n in counter.items() if n > 1)))
args = ', '.join(map(repr, value))
if dups:
raise click.BadParameter(
'recieved <%s> as an argument, which contains duplicates'
' of the following: <%s>' % (args, dups), ctx=ctx, param=self)
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2019, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
# ----------------------------------------------------------------------------
# Some of the source code in this file is derived from original work:
#
# Copyright (c) 2014 by the Pallets team.
#
# To see the license for the original work, see licenses/click.LICENSE.rst
# Specific reproduction and derivation of original work is marked below.
# ----------------------------------------------------------------------------
import click.parser as parser
import click.exceptions as exceptions
class Q2Option(parser.Option):
@property
def takes_value(self):
# store_maybe should take a value so that we hit the right branch
# in OptionParser._match_long_opt
return (super().takes_value or self.action == 'store_maybe'
or self.action == 'append_greedy')
def _maybe_take(self, state):
if not state.rargs:
return None
# In a more perfect world, we would have access to all long opts
# and could verify against those instead of just the prefix '--'
if state.rargs[0].startswith('--'):
return None
return state.rargs.pop(0)
# Specific technique derived from original:
# < https://github.com/pallets/click/blob/
# c6042bf2607c5be22b1efef2e42a94ffd281434c/click/core.py#L867 >
# Copyright (c) 2014 by the Pallets team.
def process(self, value, state):
# actions should update state.opts and state.order
if (self.dest in state.opts
and self.action not in ('append', 'append_const',
'append_maybe', 'append_greedy',
'count')):
raise exceptions.UsageError(
'Option %r was specified multiple times in the command.'
% self._get_opt_name())
elif self.action == 'store_maybe':
assert value == ()
value = self._maybe_take(state)
if value is None:
state.opts[self.dest] = self.const
else:
state.opts[self.dest] = value
state.order.append(self.obj) # can't forget this
elif self.action == 'append_maybe':
assert value == ()
value = self._maybe_take(state)
if value is None:
state.opts.setdefault(self.dest, []).append(self.const)
else:
while value is not None:
state.opts.setdefault(self.dest, []).append(value)
value = self._maybe_take(state)
state.order.append(self.obj) # can't forget this
elif self.action == 'append_greedy':
assert value == ()
value = self._maybe_take(state)
while value is not None:
state.opts.setdefault(self.dest, []).append(value)
value = self._maybe_take(state)
state.order.append(self.obj) # can't forget this
elif self.takes_value and value.startswith('--'):
# Error early instead of cascading the parse error to a "missing"
# parameter, which they ironically did provide
raise parser.BadOptionUsage(
self, '%s option requires an argument' % self._get_opt_name())
else:
super().process(value, state)
def _get_opt_name(self):
if hasattr(self.obj, 'secondary_opts'):
return ' / '.join(self.obj.opts + self.obj.secondary_opts)
if hasattr(self.obj, 'get_error_hint'):
return self.obj.get_error_hint(None)
return ' / '.join(self._long_opts)
class Q2Parser(parser.OptionParser):
# Modified from original:
# < https://github.com/pallets/click/blob/
# ic6042bf2607c5be22b1efef2e42a94ffd281434c/click/parser.py#L228 >
# Copyright (c) 2014 by the Pallets team.
def add_option(self, opts, dest, action=None, nargs=1, const=None,
obj=None):
"""Adds a new option named `dest` to the parser. The destination
is not inferred (unlike with optparse) and needs to be explicitly
provided. Action can be any of ``store``, ``store_const``,
``append``, ``appnd_const`` or ``count``.
The `obj` can be used to identify the option in the order list
that is returned from the parser.
"""
if obj is None:
obj = dest
opts = [parser.normalize_opt(opt, self.ctx) for opt in opts]
# BEGIN MODIFICATIONS
if action == 'store_maybe' or action == 'append_maybe':
# Specifically target this branch:
# < https://github.com/pallets/click/blob/
# c6042bf2607c5be22b1efef2e42a94ffd281434c/click/parser.py#L341 >
# this happens to prevents click from reading any arguments itself
# because it will only "pop" off rargs[:0], which is nothing
nargs = 0
if const is None:
raise ValueError("A 'const' must be provided when action is "
"'store_maybe' or 'append_maybe'")
elif action == 'append_greedy':
nargs = 0
option = Q2Option(opts, dest, action=action, nargs=nargs,
const=const, obj=obj)
# END MODIFICATIONS
self._opt_prefixes.update(option.prefixes)
for opt in option._short_opts:
self._short_opt[opt] = option
for opt in option._long_opts:
self._long_opt[opt] = option
def parse_args(self, args):
backup = args.copy() # args will be mutated by super()
try:
return super().parse_args(args)
except exceptions.UsageError:
if '--help' in backup:
# all is forgiven
return {'help': True}, [], ['help']
raise
# Override of private member:
# < https://github.com/pallets/click/blob/
# ic6042bf2607c5be22b1efef2e42a94ffd281434c/click/parser.py#L321 >
def _match_long_opt(self, opt, explicit_value, state):
if opt not in self._long_opt:
from q2cli.util import get_close_matches
# This is way better than substring matching
possibilities = get_close_matches(opt, self._long_opt)
raise exceptions.NoSuchOption(opt, possibilities=possibilities,
ctx=self.ctx)
return super()._match_long_opt(opt, explicit_value, state)
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2019, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
import click
def is_writable_dir(path):
import os
head = 'do-while'
path = os.path.normpath(os.path.abspath(path))
while head:
if os.path.exists(path):
if os.path.isfile(path):
return False
else:
return os.access(path, os.W_OK | os.X_OK)
path, head = os.path.split(path)
return False
class OutDirType(click.Path):
def convert(self, value, param, ctx):
import os
# Click path fails to validate writability on new paths
if os.path.exists(value):
if os.path.isfile(value):
self.fail('%r is already a file.' % (value,), param, ctx)
else:
self.fail('%r already exists, will not overwrite.' % (value,),
param, ctx)
if value[-1] != os.path.sep:
value += os.path.sep
if not is_writable_dir(value):
self.fail('%r is not a writable directory, cannot write output'
' to it.' % (value,), param, ctx)
return value
class QIIME2Type(click.ParamType):
def __init__(self, type_ast, type_repr, is_output=False):
self.type_repr = type_repr
self.type_ast = type_ast
self.is_output = is_output
self._type_expr = None
@property
def type_expr(self):
import qiime2.sdk.util
if self._type_expr is None:
self._type_expr = qiime2.sdk.util.type_from_ast(self.type_ast)
return self._type_expr
def convert(self, value, param, ctx):
import qiime2.sdk.util
if value is None:
return None # Them's the rules
if self.is_output:
return self._convert_output(value, param, ctx)
if qiime2.sdk.util.is_semantic_type(self.type_expr):
return self._convert_input(value, param, ctx)
if qiime2.sdk.util.is_metadata_type(self.type_expr):
return self._convert_metadata(value, param, ctx)
return self._convert_primitive(value, param, ctx)
def _convert_output(self, value, param, ctx):
import os
# Click path fails to validate writability on new paths
if os.path.exists(value):
if os.path.isdir(value):
self.fail('%r is already a directory.' % (value,), param, ctx)
directory = os.path.dirname(value)
if not is_writable_dir(directory):
self.fail('%r is not a writable directory, cannot write output'
' to it.' % (directory,), param, ctx)
return value
def _convert_input(self, value, param, ctx):
import os
import qiime2.sdk
import qiime2.sdk.util
try:
result = qiime2.sdk.Result.load(value)
except Exception:
self.fail('%r is not a QIIME 2 Artifact (.qza)' % value,
param, ctx)
if isinstance(result, qiime2.sdk.Visualization):
maybe = value[:-1] + 'a'
hint = ''
if os.path.exists(maybe):
hint = (' (There is an artifact with the same name:'
' %r, did you mean that?)'
% os.path.basename(maybe))
self.fail('%r is a QIIME 2 visualization (.qzv), not an '
' Artifact (.qza)%s' % (value, hint), param, ctx)
style = qiime2.sdk.util.interrogate_collection_type(self.type_expr)
if style.style is None and result not in self.type_expr:
# collections need to be handled above this
self.fail("Expected an artifact of at least type %r."
" An artifact of type %r was provided."
% (self.type_expr, result.type), param, ctx)
return result
def _convert_metadata(self, value, param, ctx):
import sys
import qiime2
import q2cli.util
if self.type_expr.name == 'MetadataColumn':
value, column = value
fp = value
try:
artifact = qiime2.Artifact.load(fp)
except Exception:
try:
metadata = qiime2.Metadata.load(fp)
except Exception as e:
header = ("There was an issue with loading the file %s as "
"metadata:" % fp)
tb = 'stderr' if '--verbose' in sys.argv else None
q2cli.util.exit_with_error(e, header=header, traceback=tb)
else:
try:
metadata = artifact.view(qiime2.Metadata)
except Exception as e:
header = ("There was an issue with viewing the artifact "
"%s as QIIME 2 Metadata:" % fp)
tb = 'stderr' if '--verbose' in sys.argv else None
q2cli.util.exit_with_error(e, header=header, traceback=tb)
if self.type_expr.name != 'MetadataColumn':
return metadata
else:
try:
metadata_column = metadata.get_column(column)
except Exception:
self.fail("There was an issue with retrieving column %r from "
"the metadata:" % column)
if metadata_column not in self.type_expr:
self.fail("Metadata column is of type %r, but expected %r."
% (metadata_column.type, self.type_expr.fields[0]))
return metadata_column
def _convert_primitive(self, value, param, ctx):
import qiime2.sdk.util
return qiime2.sdk.util.parse_primitive(self.type_expr, value)
@property
def name(self):
return self.get_metavar('')
def get_type_repr(self, param):
return self.type_repr
def get_missing_message(self, param):
if self.is_output:
return '("--output-dir" may also be used)'
......@@ -6,22 +6,22 @@
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
import collections
import click
import q2cli.dev
import q2cli.info
import q2cli.tools
import q2cli.builtin.dev
import q2cli.builtin.info
import q2cli.builtin.tools
from q2cli.click.command import BaseCommandMixin
class RootCommand(click.MultiCommand):
class RootCommand(BaseCommandMixin, click.MultiCommand):
"""This class defers to either the PluginCommand or the builtin cmds"""
_builtin_commands = collections.OrderedDict([
('info', q2cli.info.info),
('tools', q2cli.tools.tools),
('dev', q2cli.dev.dev)
])
_builtin_commands = {
'info': q2cli.builtin.info.info,
'tools': q2cli.builtin.tools.tools,
'dev': q2cli.builtin.dev.dev
}
def __init__(self, *args, **kwargs):
import re
......@@ -72,8 +72,8 @@ class RootCommand(click.MultiCommand):
# `self._plugins` will not always be obtained from
# `q2cli.cache.CACHE.plugins`.
if self._plugins is None:
import q2cli.cache
self._plugins = q2cli.cache.CACHE.plugins
import q2cli.core.cache
self._plugins = q2cli.core.cache.CACHE.plugins
name_map = {}
for name, plugin in self._plugins.items():
......@@ -98,12 +98,25 @@ class RootCommand(click.MultiCommand):
try:
plugin = self._plugin_lookup[name]
except KeyError:
return None
from q2cli.util import get_close_matches
possibilities = get_close_matches(name, self._plugin_lookup)
if len(possibilities) == 1:
hint = ' Did you mean %r?' % possibilities[0]
elif possibilities:
hint = ' (Possible commands: %s)' % ', '.join(possibilities)
else:
hint = ''
click.secho("Error: QIIME 2 has no plugin/command named %r."
% name + hint,
err=True, fg='red')
ctx.exit(2) # Match exit code of `return None`
return PluginCommand(plugin, name)
class PluginCommand(click.MultiCommand):
class PluginCommand(BaseCommandMixin, click.MultiCommand):
"""Provides ActionCommands based on available Actions"""
def __init__(self, plugin, name, *args, **kwargs):
import q2cli.util
......@@ -133,8 +146,21 @@ class PluginCommand(click.MultiCommand):
if not value or ctx.resilient_parsing:
return
click.echo('%s version %s' % (self._plugin['name'],
self._plugin['version']))
import qiime2.sdk
for entrypoint in qiime2.sdk.PluginManager.iter_entry_points():
plugin = entrypoint.load()
if (self._plugin['name'] == plugin.name):
pkg_name = entrypoint.dist.project_name
pkg_version = entrypoint.dist.version
break
else:
pkg_name = pkg_version = "[UNKNOWN]"
click.echo(
"QIIME 2 Plugin '%s' version %s (from package '%s' version %s)"
% (self._plugin['name'], self._plugin['version'],
pkg_name, pkg_version)
)
ctx.exit()
def _get_citation_records(self):
......@@ -149,82 +175,84 @@ class PluginCommand(click.MultiCommand):
try:
action = self._action_lookup[name]
except KeyError:
click.echo("Error: QIIME 2 plugin %r has no action %r."
% (self._plugin['name'], name), err=True)
from q2cli.util import get_close_matches
possibilities = get_close_matches(name, self._action_lookup)
if len(possibilities) == 1:
hint = ' Did you mean %r?' % possibilities[0]
elif possibilities:
hint = ' (Possible commands: %s)' % ', '.join(possibilities)
else:
hint = ''
click.secho("Error: QIIME 2 plugin %r has no action %r."
% (self._plugin['name'], name) + hint,
err=True, fg='red')
ctx.exit(2) # Match exit code of `return None`
return ActionCommand(name, self._plugin, action)
class ActionCommand(click.Command):
class ActionCommand(BaseCommandMixin, click.Command):
"""A click manifestation of a QIIME 2 API Action (Method/Visualizer)
The ActionCommand generates Handlers which map from 1 Action API parameter
to one or more Click.Options.
MetaHandlers are handlers which are not mapped to an API parameter, they
are handled explicitly and generally return a `fallback` function which
can be used to supplement value lookup in the regular handlers.
"""
def __init__(self, name, plugin, action):
import q2cli.handlers
import q2cli.util
import q2cli.click.type
self.plugin = plugin
self.action = action
self.generated_handlers = self.build_generated_handlers()
self.verbose_handler = q2cli.handlers.VerboseHandler()
self.quiet_handler = q2cli.handlers.QuietHandler()
# Meta-Handlers:
self.output_dir_handler = q2cli.handlers.OutputDirHandler()
self.cmd_config_handler = q2cli.handlers.CommandConfigHandler(
q2cli.util.to_cli_name(plugin['name']),
q2cli.util.to_cli_name(self.action['id'])
)
super().__init__(name, params=list(self.get_click_parameters()),
callback=self, short_help=action['name'],
help=action['description'])
def build_generated_handlers(self):
import q2cli.handlers
self._inputs, self._params, self._outputs = \
self._build_generated_options()
self._misc = [
click.Option(['--output-dir'],
type=q2cli.click.type.OutDirType(),
help='Output unspecified results to a directory'),
click.Option(['--verbose / --quiet'], default=None, required=False,
help='Display verbose output to stdout and/or stderr '
'during execution of this action. Or silence '
'output if execution is successful (silence is '
'golden).'),
q2cli.util.citations_option(self._get_citation_records)
]
handler_map = {
'input': q2cli.handlers.ArtifactHandler,
'parameter': q2cli.handlers.parameter_handler_factory,
'output': q2cli.handlers.ResultHandler
}
options = [*self._inputs, *self._params, *self._outputs, *self._misc]
super().__init__(name, params=options, callback=self,
short_help=action['name'], help=action['description'])
def _build_generated_options(self):
import q2cli.click.option
inputs = []
params = []
outputs = []
handlers = collections.OrderedDict()
for item in self.action['signature']:
item = item.copy()
type = item.pop('type')
if item['ast']['type'] == 'collection':
inner_handler = handler_map[type](**item)
handler = q2cli.handlers.CollectionHandler(inner_handler,
**item)
if type == 'input':
storage = inputs
elif type == 'parameter':
storage = params
else:
handler = handler_map[type](**item)
handlers[item['name']] = handler
return handlers
def get_click_parameters(self):
import q2cli.util
# Handlers may provide more than one click.Option
for handler in self.generated_handlers.values():
yield from handler.get_click_options()
storage = outputs
# Meta-Handlers' Options:
yield from self.output_dir_handler.get_click_options()
yield from self.cmd_config_handler.get_click_options()
opt = q2cli.click.option.GeneratedOption(prefix=type[0], **item)
storage.append(opt)
yield from self.verbose_handler.get_click_options()
yield from self.quiet_handler.get_click_options()
return inputs, params, outputs
yield q2cli.util.citations_option(self._get_citation_records)
def get_opt_groups(self, ctx):
return {
'Inputs': self._inputs,
'Parameters': self._params,
'Outputs': self._outputs,
'Miscellaneous': self._misc + [self.get_help_option(ctx)]
}
def _get_citation_records(self):
return self._get_action().citations
......@@ -237,26 +265,35 @@ class ActionCommand(click.Command):
def __call__(self, **kwargs):
"""Called when user hits return, **kwargs are Dict[click_names, Obj]"""
import itertools
import os
import qiime2.util
arguments, missing_in, verbose, quiet = self.handle_in_params(kwargs)
outputs, missing_out = self.handle_out_params(kwargs)
if missing_in or missing_out:
# A new context is generated for a callback, which will result in
# the ctx.command_path duplicating the action, so just use the
# parent so we can print the help *within* a callback.
ctx = click.get_current_context().parent
click.echo(ctx.get_help()+"\n", err=True)
for option in itertools.chain(missing_in, missing_out):
click.secho("Error: Missing option: --%s" % option, err=True,
fg='red', bold=True)
if missing_out:
click.echo(_OUTPUT_OPTION_ERR_MSG, err=True)
ctx.exit(1)
output_dir = kwargs.pop('output_dir')
verbose = kwargs.pop('verbose')
if verbose is None:
verbose = False
quiet = False
elif verbose:
quiet = False
else:
quiet = True
arguments = {}
init_outputs = {}
for key, value in kwargs.items():
prefix, *parts = key.split('_')
key = '_'.join(parts)
if prefix == 'o':
if value is None:
value = os.path.join(output_dir, key)
init_outputs[key] = value
elif prefix == 'm':
arguments[key[:-len('_file')]] = value
else:
arguments[key] = value
outputs = self._order_outputs(init_outputs)
action = self._get_action()
# `qiime2.util.redirected_stdio` defaults to stdout/stderr when
# supplied `None`.
......@@ -290,77 +327,18 @@ class ActionCommand(click.Command):
log.close()
os.remove(log.name)
if output_dir is not None:
os.makedirs(output_dir)
for result, output in zip(results, outputs):
path = result.save(output)
if not quiet:
click.secho('Saved %s to: %s' % (result.type, path),
fg='green')
def handle_in_params(self, kwargs):
import q2cli.handlers
arguments = {}
missing = []
cmd_fallback = self.cmd_config_handler.get_value(kwargs)
verbose = self.verbose_handler.get_value(kwargs, fallback=cmd_fallback)
quiet = self.quiet_handler.get_value(kwargs, fallback=cmd_fallback)
if verbose and quiet:
click.secho('Unsure of how to be quiet and verbose at the '
'same time.', fg='red', bold=True, err=True)
click.get_current_context().exit(1)
for item in self.action['signature']:
if item['type'] == 'input' or item['type'] == 'parameter':
name = item['name']
handler = self.generated_handlers[name]
try:
if isinstance(handler,
(q2cli.handlers.MetadataHandler,
q2cli.handlers.MetadataColumnHandler)):
arguments[name] = handler.get_value(
verbose, kwargs, fallback=cmd_fallback)
else:
arguments[name] = handler.get_value(
kwargs, fallback=cmd_fallback)
except q2cli.handlers.ValueNotFoundException:
missing += handler.missing
return arguments, missing, verbose, quiet
def handle_out_params(self, kwargs):
import q2cli.handlers
outputs = []
missing = []
cmd_fallback = self.cmd_config_handler.get_value(kwargs)
out_fallback = self.output_dir_handler.get_value(
kwargs, fallback=cmd_fallback
)
def fallback(*args):
try:
return cmd_fallback(*args)
except q2cli.handlers.ValueNotFoundException:
return out_fallback(*args)
def _order_outputs(self, outputs):
ordered = []
for item in self.action['signature']:
if item['type'] == 'output':
name = item['name']
handler = self.generated_handlers[name]
try:
outputs.append(handler.get_value(kwargs,
fallback=fallback))
except q2cli.handlers.ValueNotFoundException:
missing += handler.missing
return outputs, missing
_OUTPUT_OPTION_ERR_MSG = """\
Note: When only providing names for a subset of the output Artifacts or
Visualizations, you must specify an output directory through use of the
--output-dir DIRECTORY flag.\
"""
ordered.append(outputs[item['name']])
return ordered
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2019, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
import click
import q2cli.util
class Option(click.Option):
"""``click.Option`` with customized behavior for q2cli.
Note to q2cli developers: you'll generally want to use this class and its
corresponding decorator (``@q2cli.option``) over ``click.Option`` and
``@click.option`` to keep a consistent CLI behavior across commands. This
class and decorator are designed to be drop-in replacements for their Click
counterparts.
"""
def __init__(self, param_decls=None, **attrs):
if 'multiple' not in attrs and 'count' not in attrs:
self._disallow_repeated_options(attrs)
super().__init__(param_decls=param_decls, **attrs)
def _disallow_repeated_options(self, attrs):
"""Prevent option from being repeated on the command line.
Click allows options to be repeated on the command line and stores the
value of the last specified option (this is to support overriding
options set in shell aliases). While this is common behavior in CLI
tools, it is prevented in q2cli to avoid confusion with options that
are intended to be supplied multiple times (``multiple=True``; Click
calls these "multiple options"). QIIME 2 metadata is an example of a
"multiple option" in q2cli.
References
----------
.. [1] http://click.pocoo.org/6/options/#multiple-options
"""
# General strategy:
#
# Make this option a "multiple option" (``multiple=True``) and use a
# callback to unpack the stored values and assert that only a single
# value was supplied.
# Use the user-supplied callback or define a passthrough callback if
# one wasn't supplied.
if 'callback' in attrs:
callback = attrs['callback']
else:
def callback(ctx, param, value):
return value
# Wrap the callback to intercept stored values so that they can be
# unpacked and validated.
def callback_wrapper(ctx, param, value):
# When `multiple=True` Click will use an empty tuple to represent
# the absence of the option instead of `None`.
if value == ():
value = None
if not value or ctx.resilient_parsing:
return callback(ctx, param, value)
# Empty/null case is handled above, so attempt to unpack the value.
try:
value, = value
except ValueError:
click.echo(ctx.get_usage() + '\n', err=True)
click.secho(
"Error: Option --%s was specified multiple times in the "
"command." % q2cli.util.to_cli_name(param.name),
err=True, fg='red', bold=True)
ctx.exit(1)
return callback(ctx, param, value)
# Promote this option to a "multiple option" and use the callback
# wrapper to make it behave like a regular "single" option.
attrs['callback'] = callback_wrapper
attrs['multiple'] = True
# If the user set a default, promote it to a "multiple option" default
# by putting it in a list. A default of `None` is a special case that
# can't be promoted.
if 'default' in attrs and attrs['default'] is not None:
attrs['default'] = [attrs['default']]
# Modeled after `click.option` decorator.
def option(*param_decls, **attrs):
"""``@click.option`` decorator with customized behavior for q2cli.
See docstring on ``q2cli.Option`` (above) for details.
"""
if 'cls' in attrs:
raise ValueError("Cannot override `cls=q2cli.Option` in `attrs`.")
attrs['cls'] = Option
def decorator(f):
return click.option(*param_decls, **attrs)(f)
return decorator
class MultipleType(click.ParamType):
"""This is just a wrapper, it doesn't do anything on its own"""
def __init__(self, param_type):
self.param_type = param_type
@property
def name(self):
return "MULTIPLE " + self.param_type.name
def convert(self, value, param, ctx):
# Don't convert anything yet.
return value
def fail(self, *args, **kwargs):
return self.param_type.fail(*args, **kwargs)
def get_missing_message(self, *args, **kwargs):
return self.param_type.get_missing_message(*args, **kwargs)
def get_metavar(self, *args, **kwargs):
metavar = self.param_type.get_metavar(*args, **kwargs)
if metavar is None:
return None
return "MULTIPLE " + metavar
class ResultPath(click.Path):
def __init__(self, repr, *args, **kwargs):
super().__init__(*args, **kwargs)
self.repr = repr
def get_metavar(self, param):
if self.repr != 'Visualization':
return "ARTIFACT PATH " + self.repr
return "VISUALIZATION PATH"
# ----------------------------------------------------------------------------
# Copyright (c) 2016-2019, QIIME 2 development team.
#
# Distributed under the terms of the Modified BSD License.
#
# The full license is in the file LICENSE, distributed with this software.
# ----------------------------------------------------------------------------
......@@ -188,7 +188,7 @@ class DeploymentCache:
import json
import os.path
import click
import q2cli.completion
import q2cli.core.completion
import q2cli.util
click.secho(
......@@ -203,7 +203,7 @@ class DeploymentCache:
with open(path, 'w') as fh:
json.dump(state, fh)
q2cli.completion.write_bash_completion_script(
q2cli.core.completion.write_bash_completion_script(
state['plugins'], q2cli.util.get_completion_path())
# Write requirements file last because the above steps may raise errors
......@@ -276,7 +276,7 @@ class DeploymentCache:
sig = action.signature
for name, spec in itertools.chain(sig.signature_order.items(),
sig.outputs.items()):
data = {'name': name, 'repr': repr(spec.qiime_type),
data = {'name': name, 'repr': self._get_type_repr(spec.qiime_type),
'ast': spec.qiime_type.to_ast()}
if name in sig.inputs:
......@@ -292,10 +292,109 @@ class DeploymentCache:
if spec.has_default():
data['default'] = spec.default
data['metavar'] = self._get_metavar(spec.qiime_type)
data['multiple'], data['is_bool_flag'], data['metadata'] = \
self._special_option_flags(spec.qiime_type)
state['signature'].append(data)
return state
def _special_option_flags(self, type):
import qiime2.sdk.util
import itertools
multiple = None
is_bool_flag = False
metadata = None
style = qiime2.sdk.util.interrogate_collection_type(type)
if style.style is not None:
multiple = style.view.__name__
if style.style == 'simple':
names = {style.members.name, }
elif style.style == 'complex':
names = {m.name for m in
itertools.chain.from_iterable(style.members)}
else: # composite or monomorphic
names = {v.name for v in style.members}
if 'Bool' in names:
is_bool_flag = True
else: # not collection
expr = style.expr
if expr.name == 'Metadata':
multiple = 'list'
metadata = 'file'
elif expr.name == 'MetadataColumn':
metadata = 'column'
elif expr.name == 'Bool':
is_bool_flag = True
return multiple, is_bool_flag, metadata
def _get_type_repr(self, type):
import qiime2.sdk.util
type_repr = repr(type)
style = qiime2.sdk.util.interrogate_collection_type(type)
if not qiime2.sdk.util.is_semantic_type(type):
if style.style is None:
if style.expr.predicate is not None:
type_repr = repr(style.expr.predicate)
elif not type.fields:
type_repr = None
elif style.style == 'simple':
if style.members.predicate is not None:
type_repr = repr(style.members.predicate)
return type_repr
def _get_metavar(self, type):
import qiime2.sdk.util
name_to_var = {
'Visualization': 'VISUALIZATION',
'Int': 'INTEGER',
'Str': 'TEXT',
'Float': 'NUMBER',
'Bool': '',
}
style = qiime2.sdk.util.interrogate_collection_type(type)
multiple = style.style is not None
if style.style == 'simple':
inner_type = style.members
elif not multiple:
inner_type = type
else:
inner_type = None
if qiime2.sdk.util.is_semantic_type(type):
metavar = 'ARTIFACT'
elif qiime2.sdk.util.is_metadata_type(type):
metavar = 'METADATA'
elif style.style is not None and style.style != 'simple':
metavar = 'VALUE'
else:
metavar = name_to_var[inner_type.name]
if (metavar == 'NUMBER' and inner_type is not None
and inner_type.predicate is not None
and inner_type.predicate.template.start == 0
and inner_type.predicate.template.end == 1):
metavar = 'PROPORTION'
if multiple or type.name == 'Metadata':
if metavar != 'TEXT' and metavar != '' and metavar != 'METADATA':
metavar += 'S'
metavar += '...'
return metavar
# Singleton. Import and use this instance as necessary.
CACHE = DeploymentCache()
......@@ -73,6 +73,8 @@ def _generate_command_reply(cmd):
if isinstance(param, click.Option):
options.extend(param.opts)
options.extend(param.secondary_opts)
if hasattr(param, 'q2_extra_opts'):
options.extend(param.q2_extra_opts)
subcmd_names = []
if isinstance(cmd, click.MultiCommand):
......