Commit 4409178b authored by Ryan Petrello's avatar Ryan Petrello

support app tokens via `docker login --client-id X --client-secret Y`

see: https://github.com/ansible/tower-cli/issues/478
parent 9e57e0be
......@@ -23,6 +23,7 @@ from click.testing import CliRunner
from fauxquests.response import Resp
from fauxquests.utils import URL
import requests
import six.moves.urllib.parse as urlparse
import tower_cli
from tower_cli.api import client, Client
......@@ -372,6 +373,89 @@ class LoginTests(unittest.TestCase):
'Error: Invalid Tower authentication credentials', result.output
)
def test_application_scoped_token(self):
"""Establish that if `tower-cli login` is called with a username,
password, and OAuth2 client ID and secret, we obtain and write an oauth
token to the config file
"""
# Invoke the command.
mock_open = mock.mock_open()
with mock.patch('tower_cli.cli.misc.open', mock_open,
create=True):
with mock.patch.object(os, 'chmod'):
with client.test_mode as t:
# You have to modify this internal private registry to
# register a URL endpoint that _doesn't_ have the version
# prefix
prefix = Client().get_prefix(include_version=False)
t._registry[URL(prefix + 'o/', method='HEAD')] = Resp(
''.encode('utf-8'), 200, {}
)
t._registry[URL(prefix + 'o/token/', method='POST')] = Resp(
json.dumps({'access_token': 'abc123'}).encode('utf-8'),
201, {}
)
result = self.runner.invoke(
login, ['bob', '--password', 'secret', '--client-id',
'abc123', '--client-secret', 'some-secret']
)
# Ensure that we got a zero exit status
self.assertEqual(result.exit_code, 0)
data = urlparse.parse_qs(t.requests[-1].body)
assert data['scope'] == ['write']
assert data['grant_type'] == ['password']
assert data['password'] == ['secret']
assert data['username'] == ['bob']
# Ensure that the output seems to be correct.
self.assertIn(mock.call(os.path.expanduser('~/.tower_cli.cfg'), 'w'),
mock_open.mock_calls)
self.assertIn(mock.call().write('oauth_token = abc123\n'),
mock_open.mock_calls)
def test_public_application_scoped_token(self):
"""Establish that if `tower-cli login` is called with a username,
password, and public OAuth2 client ID, we obtain and write an oauth
token to the config file
"""
# Invoke the command.
mock_open = mock.mock_open()
with mock.patch('tower_cli.cli.misc.open', mock_open,
create=True):
with mock.patch.object(os, 'chmod'):
with client.test_mode as t:
# You have to modify this internal private registry to
# register a URL endpoint that _doesn't_ have the version
# prefix
prefix = Client().get_prefix(include_version=False)
t._registry[URL(prefix + 'o/', method='HEAD')] = Resp(
''.encode('utf-8'), 200, {}
)
t._registry[URL(prefix + 'o/token/', method='POST')] = Resp(
json.dumps({'access_token': 'abc123'}).encode('utf-8'),
201, {}
)
result = self.runner.invoke(
login, ['bob', '--password', 'secret', '--client-id',
'abc123']
)
# Ensure that we got a zero exit status
self.assertEqual(result.exit_code, 0)
data = urlparse.parse_qs(t.requests[-1].body)
assert data['scope'] == ['write']
assert data['grant_type'] == ['password']
assert data['password'] == ['secret']
assert data['username'] == ['bob']
assert data['client_id'] == ['abc123']
# Ensure that the output seems to be correct.
self.assertIn(mock.call(os.path.expanduser('~/.tower_cli.cfg'), 'w'),
mock_open.mock_calls)
self.assertIn(mock.call().write('oauth_token = abc123\n'),
mock_open.mock_calls)
class SupportTests(unittest.TestCase):
"""Establish that support functions in this module work in the way
......
......@@ -212,12 +212,14 @@ def config(key=None, value=None, scope='user', global_=False, unset=False):
@click.command()
@click.argument('username', required=True)
@click.option('--password', required=True, prompt=True, hide_input=True)
@click.option('--client-id', required=False)
@click.option('--client-secret', required=False)
@click.option('--scope', required=False, default='write',
type=click.Choice(['read', 'write']))
@click.option('-v', '--verbose', default=None,
help='Show information about requests being made.', is_flag=True,
required=False, callback=_apply_runtime_setting, is_eager=True)
def login(username, password, scope, verbose):
def login(username, password, scope, client_id, client_secret, verbose):
"""
Retrieves and stores an OAuth2 personal auth token.
"""
......@@ -228,19 +230,50 @@ def login(username, password, scope, verbose):
# Explicitly set a basic auth header for PAT acquisition (so that we don't
# try to auth w/ an existing user+pass or oauth2 token in a config file)
req = collections.namedtuple('req', 'headers')({})
HTTPBasicAuth(username, password)(req)
r = client.post(
'/users/{}/personal_tokens/'.format(username),
data={"description": "Tower CLI", "application": None, "scope": scope},
headers=req.headers
)
if client_id and client_secret:
HTTPBasicAuth(client_id, client_secret)(req)
req.headers['Content-Type'] = 'application/x-www-form-urlencoded'
r = client.post(
'/o/token/',
data={
"grant_type": "password",
"username": username,
"password": password,
"scope": scope
},
headers=req.headers
)
elif client_id:
req.headers['Content-Type'] = 'application/x-www-form-urlencoded'
r = client.post(
'/o/token/',
data={
"grant_type": "password",
"username": username,
"password": password,
"client_id": client_id,
"scope": scope
},
headers=req.headers
)
else:
HTTPBasicAuth(username, password)(req)
r = client.post(
'/users/{}/personal_tokens/'.format(username),
data={"description": "Tower CLI", "application": None, "scope": scope},
headers=req.headers
)
if r.ok:
result = r.json()
result.pop('summary_fields', None)
result.pop('related', None)
token = result.pop('token', None)
if client_id:
token = result.pop('access_token', None)
else:
token = result.pop('token', None)
if settings.verbose:
# only print the actual token if -v
result['token'] = token
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment