Commit 50a3750d authored by Aaron Tan's avatar Aaron Tan Committed by GitHub

Merge pull request #323 from jangsutsr/319_fix_workflow_job_launch_broken

Fix workflow job launch broken after refactoring.
parents d6630465 f9b951b0
from tower_cli import get_resource
from tower_cli.api import client
from tests.compat import unittest, mock
class WorkflowJobTest(unittest.TestCase):
def setUp(self):
self.res = get_resource('workflow_job')
def test_lookup_stdout(self):
with client.test_mode as t:
t.register_json('/unified_jobs/?order_by=finished&status__in=successful%2Cfailed%2Cerror',
{'count': 2,
'results': [
{'id': 1, 'name': 'Durham, NC'},
{'id': 2, 'name': 'Austin, TX'}
],
'next': None, 'previous': None})
ret = self.res.lookup_stdout()
self.assertIn('Durham, NC', ret)
self.assertIn('Austin, TX', ret)
self.assertEqual(len(ret.split('\n')), 7)
def test_lookup_stdout_not_full(self):
with client.test_mode as t:
t.register_json('/unified_jobs/?order_by=finished&status__in=successful%2Cfailed%2Cerror',
{'count': 2,
'results': [
{'id': 1, 'name': 'Durham, NC'},
{'id': 2, 'name': 'Austin, TX'}
],
'next': None, 'previous': None})
ret = self.res.lookup_stdout(full=False)
self.assertIn('Durham, NC', ret)
self.assertIn('Austin, TX', ret)
self.assertEqual(len(ret.split('\n')), 6)
def test_lookup_stdout_start_n_end(self):
with client.test_mode as t:
t.register_json('/unified_jobs/?order_by=finished&status__in=successful%2Cfailed%2Cerror',
{'count': 2,
'results': [
{'id': 1, 'name': 'Durham, NC'},
{'id': 2, 'name': 'Austin, TX'}
],
'next': None, 'previous': None})
ret = self.res.lookup_stdout(start_line=3, end_line=4)
self.assertIn('Durham, NC', ret)
self.assertNotIn('Austin, TX', ret)
self.assertEqual(len(ret.split('\n')), 2)
def test_launch(self):
with client.test_mode as t:
t.register_json('/workflow_job_templates/1/launch/', {'id': 1}, method='POST')
self.res.launch(workflow_job_template=1)
def test_launch_monitor(self):
with client.test_mode as t:
t.register_json('/workflow_job_templates/1/launch/', {'id': 1}, method='POST')
with mock.patch.object(self.res, 'monitor', mock.MagicMock()) as m:
self.res.launch(workflow_job_template=1, monitor=True)
assert m.called
def test_launch_wait(self):
with client.test_mode as t:
t.register_json('/workflow_job_templates/1/launch/', {'id': 1}, method='POST')
with mock.patch.object(self.res, 'wait', mock.MagicMock()) as m:
self.res.launch(workflow_job_template=1, wait=True)
assert m.called
def test_get_attribute(self):
self.res.__getattribute__('summary')
with self.assertRaises(AttributeError):
self.res.__getattribute__('stdout')
......@@ -18,6 +18,7 @@ import click
from tower_cli import models, resources, get_resource
from tower_cli.api import client
from tower_cli.cli import types
from tower_cli.cli.resource import ResSubcommand
from tower_cli.utils import debug, parser
......@@ -64,7 +65,7 @@ class Resource(models.ExeResource):
if jobs_list['count'] == 0:
return ''
return_content = uj_res.as_command()._format_human(jobs_list)
return_content = ResSubcommand(uj_res)._format_human(jobs_list)
lines = return_content.split('\n')
if not full:
lines = lines[:-1]
......@@ -112,12 +113,12 @@ class Resource(models.ExeResource):
Creates a new workflow job in Ansible Tower, starts it, and
returns back an ID in order for its status to be monitored.
"""
if len(extra_vars) > 0:
if extra_vars is not None and len(extra_vars) > 0:
kwargs['extra_vars'] = parser.process_extra_vars(extra_vars)
debug.log('Launching the workflow job.', header='details')
self._pop_none(kwargs)
post_response = client.post('workflow_job_templates/{}/launch/'.format(
post_response = client.post('workflow_job_templates/{0}/launch/'.format(
workflow_job_template), data=kwargs).json()
workflow_job_id = post_response['id']
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment