Commit c96c61f6 authored by Raphaël Hertzog's avatar Raphaël Hertzog

mail: harden bot to not create UserEmail with invalid emails

Fixes #17
parent 04bc0454
Pipeline #62562 passed with stage
in 32 minutes and 40 seconds
...@@ -13,6 +13,7 @@ commands. ...@@ -13,6 +13,7 @@ commands.
""" """
import re import re
from email.utils import parseaddr
from django.conf import settings from django.conf import settings
...@@ -157,3 +158,14 @@ class Command(metaclass=MetaCommand): ...@@ -157,3 +158,14 @@ class Command(metaclass=MetaCommand):
""" """
for item in items: for item in items:
self.reply(bullet + ' ' + str(item)) self.reply(bullet + ' ' + str(item))
@staticmethod
def validate_email(email):
_, sane_email = parseaddr(email)
if sane_email != email:
return False
if not re.match(r'[^@\s]+@[^@\s]+\.[^@\s]+$', email):
return False
return True
...@@ -189,6 +189,10 @@ class ViewDefaultKeywordsCommand(Command, KeywordCommandMixin): ...@@ -189,6 +189,10 @@ class ViewDefaultKeywordsCommand(Command, KeywordCommandMixin):
self.email) self.email)
def handle(self): def handle(self):
if not self.validate_email(self.email):
self.warning('%s is not a valid email.', self.email)
return
user_email, _ = UserEmail.objects.get_or_create(email=self.email) user_email, _ = UserEmail.objects.get_or_create(email=self.email)
email_settings, _ = \ email_settings, _ = \
EmailSettings.objects.get_or_create(user_email=user_email) EmailSettings.objects.get_or_create(user_email=user_email)
...@@ -274,6 +278,10 @@ class SetDefaultKeywordsCommand(Command, KeywordCommandMixin): ...@@ -274,6 +278,10 @@ class SetDefaultKeywordsCommand(Command, KeywordCommandMixin):
self.keywords) self.keywords)
def handle(self): def handle(self):
if not self.validate_email(self.email):
self.warning('%s is not a valid email.', self.email)
return
keywords = re.split(r'[,\s]+', self.keywords) keywords = re.split(r'[,\s]+', self.keywords)
user_email, _ = UserEmail.objects.get_or_create(email=self.email) user_email, _ = UserEmail.objects.get_or_create(email=self.email)
email_settings, _ = \ email_settings, _ = \
......
...@@ -71,6 +71,11 @@ class SubscribeCommand(Command): ...@@ -71,6 +71,11 @@ class SubscribeCommand(Command):
Implementation of a hook method which is executed instead of Implementation of a hook method which is executed instead of
:py:meth:`handle` when the command is not confirmed. :py:meth:`handle` when the command is not confirmed.
""" """
if not self.validate_email(self.user_email):
self.warning('%s is not a valid email', self.user_email)
return False
settings = get_or_none(EmailSettings, settings = get_or_none(EmailSettings,
user_email__email__iexact=self.user_email) user_email__email__iexact=self.user_email)
if settings and settings.is_subscribed_to(self.package): if settings and settings.is_subscribed_to(self.package):
......
...@@ -59,6 +59,10 @@ class JoinTeam(Command): ...@@ -59,6 +59,10 @@ class JoinTeam(Command):
return team, user_email return team, user_email
def pre_confirm(self): def pre_confirm(self):
if not self.validate_email(self.user_email):
self.warning('%s is not a valid email.', self.user_email)
return False
packed = self.get_team_and_user() packed = self.get_team_and_user()
if packed is None: if packed is None:
return False return False
...@@ -106,8 +110,8 @@ class LeaveTeam(Command): ...@@ -106,8 +110,8 @@ class LeaveTeam(Command):
self.error('Team with the slug "%s" does not exist.', self.error('Team with the slug "%s" does not exist.',
self.team_slug) self.team_slug)
return return
user_email, _ = UserEmail.objects.get_or_create(email=self.user_email) user_email = get_or_none(UserEmail, email=self.user_email)
if user_email not in team.members.all(): if not user_email or user_email not in team.members.all():
self.warning("You are not a member of the team.") self.warning("You are not a member of the team.")
return return
...@@ -167,10 +171,6 @@ class ListTeamPackages(Command): ...@@ -167,10 +171,6 @@ class ListTeamPackages(Command):
return return
return team return team
def get_user_email(self):
user_email, _ = UserEmail.objects.get_or_create(email=self.user_email)
return user_email
def get_command_text(self): def get_command_text(self):
return super(ListTeamPackages, self).get_command_text( return super(ListTeamPackages, self).get_command_text(
self.team_slug) self.team_slug)
...@@ -180,8 +180,8 @@ class ListTeamPackages(Command): ...@@ -180,8 +180,8 @@ class ListTeamPackages(Command):
if not team: if not team:
return return
if not team.public: if not team.public:
user_email = self.get_user_email() user_email = get_or_none(UserEmail, email=self.user_email)
if user_email not in team.members.all(): if not user_email or user_email not in team.members.all():
self.error( self.error(
"The team is private. " "The team is private. "
"Only team members can see its packages.") "Only team members can see its packages.")
...@@ -210,14 +210,10 @@ class WhichTeams(Command): ...@@ -210,14 +210,10 @@ class WhichTeams(Command):
super(WhichTeams, self).__init__() super(WhichTeams, self).__init__()
self.user_email = email self.user_email = email
def get_user_email(self):
user_email, _ = UserEmail.objects.get_or_create(email=self.user_email)
return user_email
def handle(self): def handle(self):
user_email = self.get_user_email() user_email = get_or_none(UserEmail, email=self.user_email)
if user_email.teams.count() == 0: if not user_email or user_email.teams.count() == 0:
self.warning("%s is not a member of any team.", self.user_email) self.warning("%s is not a member of any team.", self.user_email)
else: else:
self.reply("Teams that %s is a member of:", self.user_email) self.reply("Teams that %s is a member of:", self.user_email)
......
...@@ -44,6 +44,7 @@ from distro_tracker.core.utils import ( ...@@ -44,6 +44,7 @@ from distro_tracker.core.utils import (
) )
from distro_tracker.mail import control from distro_tracker.mail import control
from distro_tracker.mail.control.commands import UNIQUE_COMMANDS from distro_tracker.mail.control.commands import UNIQUE_COMMANDS
from distro_tracker.mail.control.commands.base import Command
from distro_tracker.mail.models import CommandConfirmation from distro_tracker.mail.models import CommandConfirmation
from distro_tracker.test import TestCase from distro_tracker.test import TestCase
...@@ -328,6 +329,24 @@ class EmailControlTest(TestCase): ...@@ -328,6 +329,24 @@ class EmailControlTest(TestCase):
return regexp.search(mail.outbox[response_number].body) return regexp.search(mail.outbox[response_number].body)
class CommandHelperMethods(TestCase):
def test_validate_email(self):
self.assertTrue(Command.validate_email('foo@example.net'))
def test_validate_email_refuses_brackets(self):
self.assertFalse(Command.validate_email('<foo@example.net>'))
def test_validate_email_refuses_html_transcode(self):
self.assertFalse(Command.validate_email(
'foo@example.net<mailto:foo@example.net>'))
def test_validate_email_two_arobase(self):
self.assertFalse(Command.validate_email('foo@example@example.net'))
def test_validate_email_no_dot(self):
self.assertFalse(Command.validate_email('foo@example'))
class ControlBotBasic(EmailControlTest): class ControlBotBasic(EmailControlTest):
def test_basic_headers(self): def test_basic_headers(self):
""" """
...@@ -1443,6 +1462,20 @@ class KeywordCommandModifyDefault(EmailControlTest, KeywordCommandHelperMixin): ...@@ -1443,6 +1462,20 @@ class KeywordCommandModifyDefault(EmailControlTest, KeywordCommandHelperMixin):
self.get_new_default_list_output_message(new_user)) self.get_new_default_list_output_message(new_user))
self.assert_keywords_in_response(keywords + all_default_keywords) self.assert_keywords_in_response(keywords + all_default_keywords)
def test_user_invalid_email(self):
"""
Tests adding a keyword to a user that does not exist and has
an invalid email.
"""
new_user = 'bad@email@domain.com'
keywords = [Keyword.objects.filter(default=False)[0].name]
self.add_keyword_command('+', keywords, new_user)
self.control_process()
self.assert_warning_in_response(
'bad@email@domain.com is not a valid email.')
class KeywordCommandShowDefault(EmailControlTest, KeywordCommandHelperMixin): class KeywordCommandShowDefault(EmailControlTest, KeywordCommandHelperMixin):
def setUp(self): def setUp(self):
...@@ -1514,6 +1547,19 @@ class KeywordCommandShowDefault(EmailControlTest, KeywordCommandHelperMixin): ...@@ -1514,6 +1547,19 @@ class KeywordCommandShowDefault(EmailControlTest, KeywordCommandHelperMixin):
user.emailsettings.default_keywords.all() user.emailsettings.default_keywords.all()
) )
def test_show_default_keywords_with_invalid_email(self):
"""
Tests that the keyword command doesn't do anything if the
email submitted is invalid.
"""
email = 'bad@email@domain.com'
self.set_input_lines(['keyword ' + email])
self.control_process()
self.assert_warning_in_response(
'bad@email@domain.com is not a valid email.')
def test_tag_alias_for_keyword(self): def test_tag_alias_for_keyword(self):
""" """
Tests that "tag" is an alias for "keyword" Tests that "tag" is an alias for "keyword"
...@@ -1841,6 +1887,19 @@ class SubscribeToPackageTest(EmailControlTest): ...@@ -1841,6 +1887,19 @@ class SubscribeToPackageTest(EmailControlTest):
self.control_process() # Must not raise anything self.control_process() # Must not raise anything
def test_subscribe_with_invalid_email(self):
"""
Non-regression test for invalid subscriptions that users managed
to create.
"""
bad_email = 'foo@example.net<mailto:foo@example.net>'
self.add_subscribe_command(self.package.name, bad_email)
self.control_process()
self.assertEqual(UserEmail.objects.filter(email=bad_email).count(), 0)
self.assert_in_response('is not a valid email')
class UnsubscribeFromPackageTest(EmailControlTest): class UnsubscribeFromPackageTest(EmailControlTest):
""" """
...@@ -2438,6 +2497,18 @@ class JoinTeamCommandsTests(TeamCommandsMixin, EmailControlTest): ...@@ -2438,6 +2497,18 @@ class JoinTeamCommandsTests(TeamCommandsMixin, EmailControlTest):
self.assert_warning_in_response(self.get_is_member_warning()) self.assert_warning_in_response(self.get_is_member_warning())
def test_join_team_with_invalid_email(self):
"""
Tests that a user gets a warning when trying to join a team
with an invalid email.
"""
self.set_input_lines([self.get_join_command(
self.team.slug, 'foo@bad@example.net')])
self.control_process()
self.assert_warning_in_response(
'foo@bad@example.net is not a valid email.')
class LeaveTeamCommandTests(TeamCommandsMixin, EmailControlTest): class LeaveTeamCommandTests(TeamCommandsMixin, EmailControlTest):
def setUp(self): def setUp(self):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment