Skip to content
Commits on Source (20)
......@@ -10,7 +10,7 @@ vendor="389 Project"
# PACKAGE_VERSION is constructed from these
VERSION_MAJOR=1
VERSION_MINOR=4
VERSION_MAINT=1.4
VERSION_MAINT=1.5
# NOTE: VERSION_PREREL is automatically set for builds made out of a git tree
VERSION_PREREL=
VERSION_DATE=$(date -u +%Y%m%d)
......
389-ds-base (1.4.1.4-1) UNRELEASED; urgency=medium
389-ds-base (1.4.1.5-1) unstable; urgency=medium
* New upstream release.
* watch: Use https.
* control: Bump policy to 4.4.0.
* Bump debhelper to 12.
* patches: fix-dsctl-remove.diff, fix-nss-path.diff, icu_pkg-config.patch removed,
upstream. Others refreshed.
* patches: fix-dsctl-remove.diff, fix-nss-path.diff, icu_pkg-config.patch
removed, upstream. Others refreshed.
* rules: Pass --enable-perl, we still need the perl tools.
* *.install: Updated.
-- Timo Aaltonen <tjaalton@debian.org> Mon, 08 Jul 2019 11:46:19 +0300
-- Timo Aaltonen <tjaalton@debian.org> Wed, 10 Jul 2019 10:05:31 +0300
389-ds-base (1.4.0.22-1) unstable; urgency=medium
......
......@@ -430,30 +430,33 @@ def test_dnsalias_keyword_test_nodns_cannot(topo, add_user, aci_of_user):
with pytest.raises(ldap.INSUFFICIENT_ACCESS):
org.replace("seeAlso", "cn=1")
def test_user_can_access_the_data_when_connecting_from_any_machine_2(topo, add_user, aci_of_user):
@pytest.mark.ds50378
@pytest.mark.bz1710848
@pytest.mark.parametrize("ip_addr", ['127.0.0.1', "[::1]"])
def test_user_can_access_from_ipv4_or_ipv6_address(topo, add_user, aci_of_user, ip_addr):
"""
User can access the data when connecting from any machine as per the ACI.
User can modify the data when accessing the server from the allowed IPv4 and IPv6 addresses
:id:461e761e-7ac5-11e8-9ae4-8c16451d917b
:setup: Standalone Server
:steps:
1. Add test entry
2. Add ACI
3. User should follow ACI role
1. Add ACI that has both IPv4 and IPv6
2. Connect from one of the IPs allowed in ACI
3. Modify an attribute
:expectedresults:
1. Entry should be added
2. Operation should succeed
3. Operation should succeed
1. ACI should be added
2. Conection should be successful
3. Operation should be successful
"""
# Add ACI
# Add ACI that contains both IPv4 and IPv6
Domain(topo.standalone, DEFAULT_SUFFIX).\
add("aci", f'(target ="ldap:///{IP_OU_KEY}")(targetattr=*) '
f'(version 3.0; aci "IP aci"; allow(all) '
f'userdn = "ldap:///{FULLIP_KEY}" and ip = "*" ;)')
f'userdn = "ldap:///{FULLIP_KEY}" and (ip = "127.0.0.1" or ip = "::1");)')
# Create a new connection for this test.
conn = UserAccount(topo.standalone, FULLIP_KEY).bind(PW_DM)
conn = UserAccount(topo.standalone, FULLIP_KEY).bind(PW_DM, uri=f'ldap://{ip_addr}:{topo.standalone.port}')
# Perform Operation
OrganizationalUnit(conn, IP_OU_KEY).replace("seeAlso", "cn=1")
......
......@@ -274,6 +274,7 @@ def test_basic_import_export(topology_st, import_example_ldif):
assert r.present('nstasklog')
assert r.present('nstaskcurrentitem')
assert r.present('nstasktotalitems')
assert r.present('ttl')
r.wait()
......
......@@ -6,16 +6,17 @@
# See LICENSE for details.
# --- END COPYRIGHT BLOCK ---
#
from random import sample
import os
import logging
import pytest
from lib389.tasks import *
from lib389.utils import *
from lib389.plugins import *
from lib389.topologies import topology_st
from lib389.plugins import AutoMembershipPlugin, ReferentialIntegrityPlugin, AutoMembershipDefinitions
from lib389.idm.user import UserAccounts
from lib389.idm.group import Groups
from lib389.idm.organizationalunit import OrganizationalUnits
from lib389._constants import DEFAULT_SUFFIX, LOG_ACCESS_LEVEL
from lib389.utils import ds_is_older
import ldap
pytestmark = pytest.mark.tier1
......@@ -80,7 +81,7 @@ def add_group_and_perform_user_operations(topology_st):
assert test_user.dn in group.list_members()
log.info('Renaming user')
test_user.rename('uid=new_test_user_777', newsuperior=SUFFIX)
test_user.rename('uid=new_test_user_777', newsuperior=DEFAULT_SUFFIX)
log.info('Delete the user')
delete_obj(test_user)
......@@ -110,28 +111,36 @@ def enable_plugins(topology_st):
topo.restart()
@pytest.fixture(scope="module")
def add_user_log_level_260(topology_st, enable_plugins):
log.info('Configure access log level to 260 (4 + 256)')
access_log_level = '260'
topology_st.standalone.config.set(LOG_ACCESS_LEVEL, access_log_level)
def add_user_log_level(topology_st, loglevel, request):
topo = topology_st.standalone
default_log_level = topo.config.get_attr_val_utf8(LOG_ACCESS_LEVEL)
log.info(f'Configure access log level to {loglevel}')
topo.config.set(LOG_ACCESS_LEVEL, str(loglevel))
add_group_and_perform_user_operations(topology_st)
def fin():
topo.config.set(LOG_ACCESS_LEVEL, default_log_level)
log.info('Delete the previous access logs for the next test')
topo.deleteAccessLogs()
request.addfinalizer(fin)
@pytest.fixture(scope="module")
def add_user_log_level_516(topology_st, enable_plugins):
log.info('Configure access log level to 516 (4 + 512)')
access_log_level = '516'
topology_st.standalone.config.set(LOG_ACCESS_LEVEL, access_log_level)
add_group_and_perform_user_operations(topology_st)
@pytest.fixture(scope="function")
def add_user_log_level_260(topology_st, enable_plugins, request):
access_log_level = 4 + 256
add_user_log_level(topology_st, access_log_level, request)
@pytest.fixture(scope="module")
def add_user_log_level_131076(topology_st, enable_plugins):
log.info('Configure access log level to 131076 (4 + 131072)')
access_log_level = '131076'
topology_st.standalone.config.set(LOG_ACCESS_LEVEL, access_log_level)
add_group_and_perform_user_operations(topology_st)
@pytest.fixture(scope="function")
def add_user_log_level_516(topology_st, enable_plugins, request):
access_log_level = 4 + 512
add_user_log_level(topology_st, access_log_level, request)
@pytest.fixture(scope="function")
def add_user_log_level_131076(topology_st, enable_plugins, request):
access_log_level = 4 + 131072
add_user_log_level(topology_st, access_log_level, request)
@pytest.mark.bz1273549
......@@ -156,7 +165,7 @@ def test_check_default(topology_st):
default = topology_st.standalone.config.get_attr_val_utf8(PLUGIN_TIMESTAMP)
# Now check it should be ON by default
assert (default == "on")
assert default == "on"
log.debug(default)
......@@ -283,6 +292,7 @@ def test_internal_log_server_level_0(topology_st):
"""
topo = topology_st.standalone
default_log_level = topo.config.get_attr_val_utf8(LOG_ACCESS_LEVEL)
log.info('Delete the previous access logs')
topo.deleteAccessLogs()
......@@ -308,6 +318,7 @@ def test_internal_log_server_level_0(topology_st):
# conn=Internal(0) op=0
assert not topo.ds_access_log.match(r'.*conn=Internal\([0-9]+\) op=[0-9]+\([0-9]+\)\([0-9]+\).*')
topo.config.set(LOG_ACCESS_LEVEL, default_log_level)
log.info('Delete the previous access logs for the next test')
topo.deleteAccessLogs()
......@@ -333,6 +344,7 @@ def test_internal_log_server_level_4(topology_st):
"""
topo = topology_st.standalone
default_log_level = topo.config.get_attr_val_utf8(LOG_ACCESS_LEVEL)
log.info('Delete the previous access logs for the next test')
topo.deleteAccessLogs()
......@@ -358,6 +370,7 @@ def test_internal_log_server_level_4(topology_st):
# conn=Internal(0) op=0
assert topo.ds_access_log.match(r'.*conn=Internal\([0-9]+\) op=[0-9]+\([0-9]+\)\([0-9]+\).*')
topo.config.set(LOG_ACCESS_LEVEL, default_log_level)
log.info('Delete the previous access logs for the next test')
topo.deleteAccessLogs()
......@@ -398,7 +411,7 @@ def test_internal_log_level_260(topology_st, add_user_log_level_260):
# These comments contain lines we are trying to find without regex (the op numbers are just examples)
log.info("Check the access logs for ADD operation of the user")
# op=10 ADD dn="uid=test_user_777,ou=branch1,dc=example,dc=com"
# op=10 ADD dn="uid=test_user_777,ou=topology_st, branch1,dc=example,dc=com"
assert topo.ds_access_log.match(r'.*op=[0-9]+ ADD dn="uid=test_user_777,ou=branch1,dc=example,dc=com".*')
# (Internal) op=10(1)(1) MOD dn="cn=group,ou=Groups,dc=example,dc=com"
assert topo.ds_access_log.match(r'.*\(Internal\) op=[0-9]+\([0-9]+\)\([0-9]+\) '
......@@ -441,9 +454,6 @@ def test_internal_log_level_260(topology_st, add_user_log_level_260):
# conn=Internal(0) op=0
assert topo.ds_access_log.match(r'.*conn=Internal\([0-9]+\) op=[0-9]+\([0-9]+\)\([0-9]+\).*')
log.info('Delete the previous access logs for the next test')
topo.deleteAccessLogs()
@pytest.mark.bz1358706
@pytest.mark.ds49029
......@@ -525,9 +535,6 @@ def test_internal_log_level_131076(topology_st, add_user_log_level_131076):
# conn=Internal(0) op=0
assert topo.ds_access_log.match(r'.*conn=Internal\([0-9]+\) op=[0-9]+\([0-9]+\)\([0-9]+\).*')
log.info('Delete the previous access logs for the next test')
topo.deleteAccessLogs()
@pytest.mark.bz1358706
@pytest.mark.ds49029
......@@ -618,6 +625,42 @@ def test_internal_log_level_516(topology_st, add_user_log_level_516):
# conn=Internal(0) op=0
assert topo.ds_access_log.match(r'.*conn=Internal\([0-9]+\) op=[0-9]+\([0-9]+\)\([0-9]+\).*')
@pytest.mark.skipif(ds_is_older('1.4.1.4'), reason="Not implemented")
@pytest.mark.bz1358706
@pytest.mark.ds49232
def test_access_log_truncated_search_message(topology_st):
"""Tests that the access log message is properly truncated when the message is too long
:id: 0a9af37d-3311-4a2f-ac0a-9a1c631aaf27
:setup: Standalone instance
:steps:
1. Make a search with a 2048+ characters basedn, filter and attribute list
2. Check the access log has the message and it's truncated
:expectedresults:
1. Operation should be successful
2. Access log should contain truncated basedn, filter and attribute list
"""
topo = topology_st.standalone
large_str_base = "".join("cn=test," for _ in range(512))
large_str_filter = "".join("(cn=test)" for _ in range(512))
users = UserAccounts(topo, f'{large_str_base}dc=ending')
users._list_attrlist = [f'cn{i}' for i in range(512)]
log.info("Make a search")
users.filter(f'(|(objectclass=tester){large_str_filter}(cn=ending))')
log.info('Restart the server to flush the logs')
topo.restart()
assert topo.ds_access_log.match(r'.*cn=test,cn=test,.*')
assert topo.ds_access_log.match(r'.*objectClass=tester.*')
assert topo.ds_access_log.match(r'.*cn10.*')
assert not topo.ds_access_log.match(r'.*dc=ending.*')
assert not topo.ds_access_log.match(r'.*cn=ending.*')
assert not topo.ds_access_log.match(r'.*cn500.*')
log.info('Delete the previous access logs for the next test')
topo.deleteAccessLogs()
......
......@@ -403,6 +403,67 @@ def test_inconsistencies(topo_tls_ldapi):
user_m1.delete()
def test_suffix_exists(topo_tls_ldapi):
"""Check if wrong suffix is provided, server is giving Error: Failed
to validate suffix.
:id: ce75debc-c07f-4e72-8787-8f99cbfaf1e2
:setup: Two master replication
:steps:
1. Run ds-replcheck with wrong suffix (Non Existing)
:expectedresults:
1. It should be unsuccessful
"""
m1 = topo_tls_ldapi.ms["master1"]
m2 = topo_tls_ldapi.ms["master2"]
ds_replcheck_path = os.path.join(m1.ds_paths.bin_dir, 'ds-replcheck')
if ds_is_newer("1.4.1.2"):
tool_cmd = [ds_replcheck_path, 'online', '-b', 'dc=test,dc=com', '-D', DN_DM, '-w', PW_DM,
'-m', 'ldaps://{}:{}'.format(m1.host, m1.sslport),
'-r', 'ldaps://{}:{}'.format(m2.host, m2.sslport)]
else:
tool_cmd = [ds_replcheck_path, '-b', 'dc=test,dc=com', '-D', DN_DM, '-w', PW_DM,
'-m', 'ldaps://{}:{}'.format(m1.host, m1.sslport),
'-r', 'ldaps://{}:{}'.format(m2.host, m2.sslport)]
result1 = subprocess.Popen(tool_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding='utf-8')
result = result1.communicate()
assert "Failed to validate suffix" in result[0]
def test_check_missing_tombstones(topo_tls_ldapi):
"""Check missing tombstone entries is not reported.
:id: 93067a5a-416e-4243-9418-c4dfcf42e093
:setup: Two master replication
:steps:
1. Pause replication between master and replica
2. Add and delete an entry on the master
3. Run ds-replcheck
4. Verify there are NO complaints about missing entries/tombstones
:expectedresults:
1. It should be successful
2. It should be successful
3. It should be successful
4. It should be successful
"""
m1 = topo_tls_ldapi.ms["master1"]
m2 = topo_tls_ldapi.ms["master2"]
try:
topo_tls_ldapi.pause_all_replicas()
users_m1 = UserAccounts(m1, DEFAULT_SUFFIX)
user0 = users_m1.create_test_user(1000)
user0.delete()
for tool_cmd in replcheck_cmd_list(topo_tls_ldapi):
result = subprocess.check_output(tool_cmd, encoding='utf-8').lower()
assert "entries missing on replica" not in result
finally:
topo_tls_ldapi.resume_all_replicas()
if __name__ == '__main__':
# Run isolated
# -s for DEBUG mode
......
# --- BEGIN COPYRIGHT BLOCK ---
# Copyright (C) 2019 Red Hat, Inc.
# All rights reserved.
#
# License: GPL (version 3 or any later version).
# See LICENSE for details.
# --- END COPYRIGHT BLOCK ----
"""
verify and testing indexing Filter from a search
"""
import os
import pytest
from lib389._constants import DEFAULT_SUFFIX, PW_DM
from lib389.topologies import topology_st as topo
from lib389.idm.user import UserAccounts
from lib389.idm.account import Accounts
from lib389.cos import CosTemplates
from lib389.schema import Schema
pytestmark = pytest.mark.tier1
FILTERS = ["(|(|(ou=nothing1)(ou=people))(|(ou=nothing2)(ou=nothing3)))",
"(|(|(ou=people)(ou=nothing1))(|(ou=nothing2)(ou=nothing3)))",
"(|(|(ou=nothing1)(ou=nothing2))(|(ou=people)(ou=nothing3)))",
"(|(|(ou=nothing1)(ou=nothing2))(|(ou=nothing3)(ou=people)))",
"(&(sn<=0000000000000000)(givenname>=FFFFFFFFFFFFFFFF))",
"(&(sn>=0000000000000000)(sn<=1111111111111111))",
"(&(sn>=0000000000000000)(givenname<=FFFFFFFFFFFFFFFF))"]
INDEXES = ["(uidNumber=18446744073709551617)",
"(gidNumber=18446744073709551617)",
"(MYINTATTR=18446744073709551617)",
"(&(uidNumber=*)(!(uidNumber=18446744073709551617)))",
"(&(gidNumber=*)(!(gidNumber=18446744073709551617)))",
"(&(uidNumber=*)(!(gidNumber=18446744073709551617)))",
"(&(myintattr=*)(!(myintattr=18446744073709551617)))",
"(uidNumber>=-18446744073709551617)",
"(gidNumber>=-18446744073709551617)",
"(uidNumber<=18446744073709551617)",
"(gidNumber<=18446744073709551617)",
"(myintattr<=18446744073709551617)"]
INDEXES_FALSE = ["(gidNumber=54321)",
"(uidNumber=54321)",
"(myintattr=54321)",
"(gidNumber<=-999999999999999999999999999999)",
"(uidNumber<=-999999999999999999999999999999)",
"(myintattr<=-999999999999999999999999999999)",
"(gidNumber>=999999999999999999999999999999)",
"(uidNumber>=999999999999999999999999999999)",
"(myintattr>=999999999999999999999999999999)"]
@pytest.fixture(scope="module")
def _create_entries(topo):
"""
Will create necessary users for this script.
"""
# Creating Users
users_people = UserAccounts(topo.standalone, DEFAULT_SUFFIX)
for count in range(3):
users_people.create(properties={
'ou': ['Accounting', 'People'],
'cn': f'User {count}F',
'sn': f'{count}' * 16,
'givenname': 'FFFFFFFFFFFFFFFF',
'uid': f'user{count}F',
'mail': f'user{count}F@test.com',
'manager': f'uid=user{count}F,ou=People,{DEFAULT_SUFFIX}',
'userpassword': PW_DM,
'homeDirectory': '/home/' + f'user{count}F',
'uidNumber': '1000',
'gidNumber': '2000',
})
cos = CosTemplates(topo.standalone, DEFAULT_SUFFIX, rdn='ou=People')
for user, number, des in [('a', '18446744073709551617', '2^64+1'),
('b', '18446744073709551618', '2^64+1'),
('c', '-18446744073709551617', '-2^64+1'),
('d', '-18446744073709551618', '-2^64+1'),
('e', '0', '0'),
('f', '2', '2'),
('g', '-2', '-2')]:
cos.create(properties={
'cn': user,
'uidnumber': number,
'gidnumber': number,
'myintattr': number,
'description': f'uidnumber value {des} - gidnumber is same but not indexed'
})
@pytest.mark.parametrize("real_value", FILTERS)
def test_positive(topo, _create_entries, real_value):
"""Test positive filters
:id: 57243326-91ae-11e9-aca3-8c16451d917b
:setup: Standalone
:steps:
1. Try to pass filter rules as per the condition .
:expected results:
1. Pass
"""
assert Accounts(topo.standalone, DEFAULT_SUFFIX).filter(real_value)
def test_indexing_schema(topo, _create_entries):
"""Test with schema
:id: 67a2179a-91ae-11e9-9a33-8c16451d917b
:setup: Standalone
:steps:
1. Add attribute types to Schema.
2. Try to pass filter rules as per the condition .
:expected results:
1. Pass
2. Pass
"""
cos = CosTemplates(topo.standalone, DEFAULT_SUFFIX, rdn='ou=People')
Schema(topo.standalone).add('attributetypes',
"( 8.9.10.11.12.13.14.15 NAME 'myintattr' DESC 'for integer "
"syntax index ordering testing' EQUALITY integerMatch ORDERING "
"integerOrderingMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 )")
topo.standalone.restart()
assert cos.filter("(myintattr>=-18446744073709551617)")
@pytest.mark.parametrize("real_value", INDEXES)
def test_indexing(topo, _create_entries, real_value):
"""Test positive index filters
:id: 7337589a-91ae-11e9-ad44-8c16451d917b
:setup: Standalone
:steps:
1. Try to pass filter rules as per the condition .
:expected results:
1. Pass
"""
cos = CosTemplates(topo.standalone, DEFAULT_SUFFIX, rdn='ou=People')
assert cos.filter(real_value)
@pytest.mark.parametrize("real_value", INDEXES_FALSE)
def test_indexing_negative(topo, _create_entries, real_value):
"""Test negative index filters
:id: 7e19deae-91ae-11e9-900c-8c16451d917b
:setup: Standalone
:steps:
1. Try to pass negative filter rules as per the condition .
:expected results:
1. Fail
"""
cos = CosTemplates(topo.standalone, DEFAULT_SUFFIX, rdn='ou=People')
assert not cos.filter(real_value)
if __name__ == '__main__':
CURRENT_FILE = os.path.realpath(__file__)
pytest.main("-s -v %s" % CURRENT_FILE)
......@@ -106,117 +106,184 @@ TESTED_MATCHING_RULES = ["bitStringMatch", "caseExactIA5Match", "caseExactMatch"
"octetStringOrderingMatch"]
MATCHING_RULES = [('addentrybitStringMatch', 'attrbitStringMatch',
["'0010'B", "'0011'B", "'0100'B", "'0101'B", "'0110'B"],
["'0001'B", "'0001'B", "'0010'B", "'0010'B", "'0011'B",
MATCHING_RULES = [
{'attr': 'attrbitStringMatch',
'positive': ["'0010'B", "'0011'B", "'0100'B", "'0101'B", "'0110'B"],
'negative': ["'0001'B", "'0001'B", "'0010'B", "'0010'B", "'0011'B",
"'0011'B", "'0100'B", "'0100'B", "'0101'B",
"'0101'B", "'0110'B", "'0110'B"]),
('addentrycaseExactIA5Match', 'attrcaseExactIA5Match',
['sPrain', 'spRain', 'sprAin', 'spraIn', 'sprain'],
['Sprain', 'Sprain', 'Sprain', 'Sprain', 'SpRain',
"'0101'B", "'0110'B", "'0110'B"]},
{'attr': 'attrcaseExactIA5Match',
'positive': ['sPrain', 'spRain', 'sprAin', 'spraIn', 'sprain'],
'negative': ['Sprain', 'Sprain', 'Sprain', 'Sprain', 'SpRain',
'SpRain', 'SprAin', 'SprAin', 'SpraIn', 'SpraIn',
'Sprain', 'Sprain']),
('addentrycaseExactMatch', 'attrcaseExactMatch',
['ÇéliNé Ändrè', 'Çéliné ÄndrÈ', 'Çéliné Ändrè', 'çÉliné Ändrè'],
['ÇélIné Ändrè', 'ÇélIné Ändrè', 'ÇéliNé Ändrè', 'ÇéliNé Ändrè',
'Sprain', 'Sprain']},
{'attr': 'attrcaseExactMatch',
'positive': ['ÇéliNé Ändrè', 'Çéliné ÄndrÈ', 'Çéliné Ändrè', 'çÉliné Ändrè'],
'negative': ['ÇélIné Ändrè', 'ÇélIné Ändrè', 'ÇéliNé Ändrè', 'ÇéliNé Ändrè',
'Çéliné ÄndrÈ', 'Çéliné ÄndrÈ', 'Çéliné Ändrè', 'Çéliné Ändrè',
'çÉliné Ändrè', 'çÉliné Ändrè']),
('addentrygeneralizedTimeMatch', 'attrgeneralizedTimeMatch',
['20100218171301Z', '20100218171302Z', '20100218171303Z',
'çÉliné Ändrè', 'çÉliné Ändrè']},
{'attr': 'attrgeneralizedTimeMatch',
'positive': ['20100218171301Z', '20100218171302Z', '20100218171303Z',
'20100218171304Z', '20100218171305Z'],
['20100218171300Z', '20100218171300Z', '20100218171301Z',
'negative': ['20100218171300Z', '20100218171300Z', '20100218171301Z',
'20100218171301Z', '20100218171302Z', '20100218171302Z',
'20100218171303Z', '20100218171303Z', '20100218171304Z',
'20100218171304Z', '20100218171305Z', '20100218171305Z']),
('addentrybooleanMatch', 'attrbooleanMatch',
['FALSE'],
['TRUE', 'TRUE', 'FALSE', 'FALSE']),
('addentrycaseIgnoreIA5Match', 'attrcaseIgnoreIA5Match',
['sprain2', 'sprain3', 'sprain4', 'sprain5', 'sprain6'],
['sprain1', 'sprain1', 'sprain2', 'sprain2', 'sprain3',
'20100218171304Z', '20100218171305Z', '20100218171305Z']},
{'attr': 'attrbooleanMatch',
'positive': ['FALSE'],
'negative': ['TRUE', 'TRUE', 'FALSE', 'FALSE']},
{'attr': 'attrcaseIgnoreIA5Match',
'positive': ['sprain2', 'sprain3', 'sprain4', 'sprain5', 'sprain6'],
'negative': ['sprain1', 'sprain1', 'sprain2', 'sprain2', 'sprain3',
'sprain3', 'sprain4', 'sprain4', 'sprain5', 'sprain5',
'sprain6', 'sprain6']),
('addentrycaseIgnoreMatch', 'attrcaseIgnoreMatch',
['ÇélIné Ändrè2', 'ÇélIné Ändrè3', 'ÇélIné Ändrè4',
'sprain6', 'sprain6']},
{'attr': 'attrcaseIgnoreMatch',
'positive': ['ÇélIné Ändrè2', 'ÇélIné Ändrè3', 'ÇélIné Ändrè4',
'ÇélIné Ändrè5', 'ÇélIné Ändrè6'],
['ÇélIné Ändrè1', 'ÇélIné Ändrè1', 'ÇélIné Ändrè2',
'negative': ['ÇélIné Ändrè1', 'ÇélIné Ändrè1', 'ÇélIné Ändrè2',
'ÇélIné Ändrè2', 'ÇélIné Ändrè3', 'ÇélIné Ändrè3',
'ÇélIné Ändrè4', 'ÇélIné Ändrè4', 'ÇélIné Ändrè5',
'ÇélIné Ändrè5', 'ÇélIné Ändrè6', 'ÇélIné Ändrè6']),
('addentrycaseIgnoreListMatch', 'attrcaseIgnoreListMatch',
['foo2$bar', 'foo3$bar', 'foo4$bar', 'foo5$bar', 'foo6$bar'],
['foo1$bar', 'foo1$bar', 'foo2$bar', 'foo2$bar', 'foo3$bar',
'ÇélIné Ändrè5', 'ÇélIné Ändrè6', 'ÇélIné Ändrè6']},
{'attr': 'attrcaseIgnoreListMatch',
'positive': ['foo2$bar', 'foo3$bar', 'foo4$bar', 'foo5$bar', 'foo6$bar'],
'negative': ['foo1$bar', 'foo1$bar', 'foo2$bar', 'foo2$bar', 'foo3$bar',
'foo3$bar', 'foo4$bar', 'foo4$bar', 'foo5$bar', 'foo5$bar',
'foo6$bar', 'foo6$bar']),
('addentryobjectIdentifierMatch', 'attrobjectIdentifierMatch',
['1.3.6.1.4.1.1466.115.121.1.24', '1.3.6.1.4.1.1466.115.121.1.26',
'foo6$bar', 'foo6$bar']},
{'attr': 'attrobjectIdentifierMatch',
'positive': ['1.3.6.1.4.1.1466.115.121.1.24', '1.3.6.1.4.1.1466.115.121.1.26',
'1.3.6.1.4.1.1466.115.121.1.40', '1.3.6.1.4.1.1466.115.121.1.41',
'1.3.6.1.4.1.1466.115.121.1.6'],
['1.3.6.1.4.1.1466.115.121.1.15', '1.3.6.1.4.1.1466.115.121.1.15',
'negative': ['1.3.6.1.4.1.1466.115.121.1.15', '1.3.6.1.4.1.1466.115.121.1.15',
'1.3.6.1.4.1.1466.115.121.1.24', '1.3.6.1.4.1.1466.115.121.1.24',
'1.3.6.1.4.1.1466.115.121.1.26', '1.3.6.1.4.1.1466.115.121.1.26',
'1.3.6.1.4.1.1466.115.121.1.40', '1.3.6.1.4.1.1466.115.121.1.40',
'1.3.6.1.4.1.1466.115.121.1.41', '1.3.6.1.4.1.1466.115.121.1.41',
'1.3.6.1.4.1.1466.115.121.1.6', '1.3.6.1.4.1.1466.115.121.1.6']),
('addentrydirectoryStringFirstComponentMatch',
'attrdirectoryStringFirstComponentMatch',
['ÇélIné Ändrè2', 'ÇélIné Ändrè3', 'ÇélIné Ändrè4', 'ÇélIné Ändrè5',
'1.3.6.1.4.1.1466.115.121.1.6', '1.3.6.1.4.1.1466.115.121.1.6']},
{'attr': 'attrdirectoryStringFirstComponentMatch',
'positive': ['ÇélIné Ändrè2', 'ÇélIné Ändrè3', 'ÇélIné Ändrè4', 'ÇélIné Ändrè5',
'ÇélIné Ändrè6'],
['ÇélIné Ändrè1', 'ÇélIné Ändrè1', 'ÇélIné Ändrè2', 'ÇélIné Ändrè2',
'negative': ['ÇélIné Ändrè1', 'ÇélIné Ändrè1', 'ÇélIné Ändrè2', 'ÇélIné Ändrè2',
'ÇélIné Ändrè3', 'ÇélIné Ändrè3', 'ÇélIné Ändrè4', 'ÇélIné Ändrè4',
'ÇélIné Ändrè5', 'ÇélIné Ändrè5', 'ÇélIné Ändrè6', 'ÇélIné Ändrè6']),
('addentryobjectIdentifierFirstComponentMatch',
'attrobjectIdentifierFirstComponentMatch',
['1.3.6.1.4.1.1466.115.121.1.24', '1.3.6.1.4.1.1466.115.121.1.26',
'ÇélIné Ändrè5', 'ÇélIné Ändrè5', 'ÇélIné Ändrè6', 'ÇélIné Ändrè6']},
{'attr': 'attrobjectIdentifierFirstComponentMatch',
'positive': ['1.3.6.1.4.1.1466.115.121.1.24', '1.3.6.1.4.1.1466.115.121.1.26',
'1.3.6.1.4.1.1466.115.121.1.40', '1.3.6.1.4.1.1466.115.121.1.41',
'1.3.6.1.4.1.1466.115.121.1.6'],
['1.3.6.1.4.1.1466.115.121.1.15', '1.3.6.1.4.1.1466.115.121.1.15',
'negative': ['1.3.6.1.4.1.1466.115.121.1.15', '1.3.6.1.4.1.1466.115.121.1.15',
'1.3.6.1.4.1.1466.115.121.1.24', '1.3.6.1.4.1.1466.115.121.1.24',
'1.3.6.1.4.1.1466.115.121.1.26', '1.3.6.1.4.1.1466.115.121.1.26',
'1.3.6.1.4.1.1466.115.121.1.40', '1.3.6.1.4.1.1466.115.121.1.40',
'1.3.6.1.4.1.1466.115.121.1.41', '1.3.6.1.4.1.1466.115.121.1.41',
'1.3.6.1.4.1.1466.115.121.1.6', '1.3.6.1.4.1.1466.115.121.1.6']),
('addentrydistinguishedNameMatch', 'attrdistinguishedNameMatch',
['cn=foo2,cn=bar', 'cn=foo3,cn=bar', 'cn=foo4,cn=bar',
'1.3.6.1.4.1.1466.115.121.1.6', '1.3.6.1.4.1.1466.115.121.1.6']},
{'attr': 'attrdistinguishedNameMatch',
'positive': ['cn=foo2,cn=bar', 'cn=foo3,cn=bar', 'cn=foo4,cn=bar',
'cn=foo5,cn=bar', 'cn=foo6,cn=bar'],
['cn=foo1,cn=bar', 'cn=foo1,cn=bar', 'cn=foo2,cn=bar',
'negative': ['cn=foo1,cn=bar', 'cn=foo1,cn=bar', 'cn=foo2,cn=bar',
'cn=foo2,cn=bar', 'cn=foo3,cn=bar', 'cn=foo3,cn=bar',
'cn=foo4,cn=bar', 'cn=foo4,cn=bar', 'cn=foo5,cn=bar',
'cn=foo5,cn=bar', 'cn=foo6,cn=bar', 'cn=foo6,cn=bar']),
('addentryintegerMatch', 'attrintegerMatch',
['-1', '0', '1', '2', '3'],
['-2', '-2', '-1', '-1', '0', '0', '1', '1', '2', '2', '3', '3']),
('addentryintegerFirstComponentMatch', 'attrintegerFirstComponentMatch',
['-1', '0', '1', '2', '3'],
['-2', '-2', '-1', '-1', '0', '0', '1', '1', '2', '2', '3', '3']),
('addentryuniqueMemberMatch', 'attruniqueMemberMatch',
["cn=foo2,cn=bar#'0010'B", "cn=foo3,cn=bar#'0011'B",
'cn=foo5,cn=bar', 'cn=foo6,cn=bar', 'cn=foo6,cn=bar']},
{'attr': 'attrintegerMatch',
'positive': ['-1', '0', '1', '2', '3'],
'negative': ['-2', '-2', '-1', '-1', '0', '0', '1', '1', '2', '2', '3', '3']},
{'attr': 'attrintegerFirstComponentMatch',
'positive': ['-1', '0', '1', '2', '3'],
'negative': ['-2', '-2', '-1', '-1', '0', '0', '1', '1', '2', '2', '3', '3']},
{'attr': 'attruniqueMemberMatch',
'positive': ["cn=foo2,cn=bar#'0010'B", "cn=foo3,cn=bar#'0011'B",
"cn=foo4,cn=bar#'0100'B", "cn=foo5,cn=bar#'0101'B",
"cn=foo6,cn=bar#'0110'B"],
["cn=foo1,cn=bar#'0001'B", "cn=foo1,cn=bar#'0001'B",
'negative': ["cn=foo1,cn=bar#'0001'B", "cn=foo1,cn=bar#'0001'B",
"cn=foo2,cn=bar#'0010'B", "cn=foo2,cn=bar#'0010'B",
"cn=foo3,cn=bar#'0011'B", "cn=foo3,cn=bar#'0011'B",
"cn=foo4,cn=bar#'0100'B", "cn=foo4,cn=bar#'0100'B",
"cn=foo5,cn=bar#'0101'B", "cn=foo5,cn=bar#'0101'B",
"cn=foo6,cn=bar#'0110'B", "cn=foo6,cn=bar#'0110'B"]),
('addentrynumericStringMatch', 'attrnumericStringMatch',
['00002', '00003', '00004', '00005', '00006'],
['00001', '00001', '00002', '00002', '00003', '00003', '00004',
'00004', '00005', '00005', '00006', '00006']),
('addentrytelephoneNumberMatch', 'attrtelephoneNumberMatch',
['+1 408 555 5625', '+1 408 555 6201', '+1 408 555 8585',
"cn=foo6,cn=bar#'0110'B", "cn=foo6,cn=bar#'0110'B"]},
{'attr': 'attrnumericStringMatch',
'positive': ['00002', '00003', '00004', '00005', '00006'],
'negative': ['00001', '00001', '00002', '00002', '00003', '00003', '00004',
'00004', '00005', '00005', '00006', '00006']},
{'attr': 'attrtelephoneNumberMatch',
'positive': ['+1 408 555 5625', '+1 408 555 6201', '+1 408 555 8585',
'+1 408 555 9187', '+1 408 555 9423'],
['+1 408 555 4798', '+1 408 555 4798', '+1 408 555 5625',
'negative': ['+1 408 555 4798', '+1 408 555 4798', '+1 408 555 5625',
'+1 408 555 5625', '+1 408 555 6201', '+1 408 555 6201',
'+1 408 555 8585', '+1 408 555 8585', '+1 408 555 9187',
'+1 408 555 9187', '+1 408 555 9423', '+1 408 555 9423']),
('addentryoctetStringMatch', 'attroctetStringMatch',
['AAAAAAAAAAAAAAI=', 'AAAAAAAAAAAAAAM=', 'AAAAAAAAAAAAAAQ=',
'+1 408 555 9187', '+1 408 555 9423', '+1 408 555 9423']},
{'attr': 'attroctetStringMatch',
'positive': ['AAAAAAAAAAAAAAI=', 'AAAAAAAAAAAAAAM=', 'AAAAAAAAAAAAAAQ=',
'AAAAAAAAAAAAAAU=', 'AAAAAAAAAAAAAAY='],
['AAAAAAAAAAAAAAE=', 'AAAAAAAAAAAAAAE=', 'AAAAAAAAAAAAAAI=',
'negative': ['AAAAAAAAAAAAAAE=', 'AAAAAAAAAAAAAAE=', 'AAAAAAAAAAAAAAI=',
'AAAAAAAAAAAAAAI=', 'AAAAAAAAAAAAAAM=', 'AAAAAAAAAAAAAAM=',
'AAAAAAAAAAAAAAQ=', 'AAAAAAAAAAAAAAQ=', 'AAAAAAAAAAAAAAU=',
'AAAAAAAAAAAAAAU=', 'AAAAAAAAAAAAAAY=', 'AAAAAAAAAAAAAAY='])]
'AAAAAAAAAAAAAAU=', 'AAAAAAAAAAAAAAY=', 'AAAAAAAAAAAAAAY=']}]
MATCHING_MODES = [
{'attr': 'attrbitStringMatch',
'positive': ["'0001'B"],
'negative': ["'0001'B", "'0010'B", "'0011'B", "'0100'B", "'0101'B", "'0110'B"]},
{'attr': 'attrcaseExactIA5Match',
'positive': 'Sprain',
'negative': ['Sprain', 'sPrain', 'spRain', 'sprAin', 'spraIn', 'sprain']},
{'attr': 'attrcaseExactMatch',
'positive': 'ÇélIné Ändrè',
'negative': ['ÇélIné Ändrè', 'ÇéliNé Ändrè', 'Çéliné ÄndrÈ', 'Çéliné Ändrè', 'çÉliné Ändrè']},
{'attr': 'attrgeneralizedTimeMatch',
'positive': '20100218171300Z',
'negative': ['20100218171300Z', '20100218171301Z', '20100218171302Z',
'20100218171303Z', '20100218171304Z', '20100218171305Z']},
{'attr': 'attrbooleanMatch',
'positive': 'TRUE',
'negative': ['TRUE', 'FALSE']},
{'attr': 'attrcaseIgnoreIA5Match',
'positive': 'sprain1',
'negative': ['sprain1', 'sprain2', 'sprain3', 'sprain4', 'sprain5', 'sprain6']},
{'attr': 'attrcaseIgnoreMatch',
'positive': 'ÇélIné Ändrè1',
'negative': ['ÇélIné Ändrè1', 'ÇélIné Ändrè2', 'ÇélIné Ändrè3', 'ÇélIné Ändrè4',
'ÇélIné Ändrè5', 'ÇélIné Ändrè6']},
{'attr': 'attrcaseIgnoreListMatch',
'positive': 'foo1$bar',
'negative': ['foo1$bar', 'foo2$bar', 'foo3$bar', 'foo4$bar', 'foo5$bar', 'foo6$bar']},
{'attr': 'attrobjectIdentifierMatch',
'positive': '1.3.6.1.4.1.1466.115.121.1.15',
'negative': ['1.3.6.1.4.1.1466.115.121.1.15', '1.3.6.1.4.1.1466.115.121.1.24',
'1.3.6.1.4.1.1466.115.121.1.26', '1.3.6.1.4.1.1466.115.121.1.40',
'1.3.6.1.4.1.1466.115.121.1.41', '1.3.6.1.4.1.1466.115.121.1.6']},
{'attr': 'attrdirectoryStringFirstComponentMatch',
'positive': 'ÇélIné Ändrè1',
'negative': ['ÇélIné Ändrè1', 'ÇélIné Ändrè2', 'ÇélIné Ändrè3', 'ÇélIné Ändrè4',
'ÇélIné Ändrè5', 'ÇélIné Ändrè6']},
{'attr': 'attrobjectIdentifierFirstComponentMatch',
'positive': '1.3.6.1.4.1.1466.115.121.1.15',
'negative': ['1.3.6.1.4.1.1466.115.121.1.15', '1.3.6.1.4.1.1466.115.121.1.24',
'1.3.6.1.4.1.1466.115.121.1.26', '1.3.6.1.4.1.1466.115.121.1.40',
'1.3.6.1.4.1.1466.115.121.1.41', '1.3.6.1.4.1.1466.115.121.1.6']},
{'attr': 'attrdistinguishedNameMatch',
'positive': 'cn=foo1,cn=bar',
'negative': ['cn=foo1,cn=bar', 'cn=foo2,cn=bar', 'cn=foo3,cn=bar', 'cn=foo4,cn=bar',
'cn=foo5,cn=bar', 'cn=foo6,cn=bar']},
{'attr': 'attrintegerMatch',
'positive': '-2',
'negative': ['-2', '-1', '0', '1', '2', '3']},
{'attr': 'attrintegerFirstComponentMatch',
'positive': '-2',
'negative': ['-2', '-1', '0', '1', '2', '3']},
{'attr': 'attruniqueMemberMatch',
'positive': "cn=foo1,cn=bar#'0001'B",
'negative': ["cn=foo1,cn=bar#'0001'B", "cn=foo2,cn=bar#'0010'B", "cn=foo3,cn=bar#'0011'B",
"cn=foo4,cn=bar#'0100'B", "cn=foo5,cn=bar#'0101'B", "cn=foo6,cn=bar#'0110'B"]},
{'attr': 'attrnumericStringMatch',
'positive': '00001',
'negative': ['00001', '00002', '00003', '00004', '00005', '00006']},
{'attr': 'attrtelephoneNumberMatch',
'positive': '+1 408 555 4798',
'negative': ['+1 408 555 4798', '+1 408 555 5625', '+1 408 555 6201', '+1 408 555 8585',
'+1 408 555 9187', '+1 408 555 9423']},
{'attr': 'attroctetStringMatch',
'positive': 'AAAAAAAAAAAAAAE=',
'negative': ['AAAAAAAAAAAAAAE=', 'AAAAAAAAAAAAAAI=', 'AAAAAAAAAAAAAAM=', 'AAAAAAAAAAAAAAQ=',
'AAAAAAAAAAAAAAU=', 'AAAAAAAAAAAAAAY=']}]
def test_matching_rules(topology_st):
......@@ -251,8 +318,8 @@ def test_add_attribute_types(topology_st):
Schema(topology_st.standalone).add('attributetypes', attribute)
@pytest.mark.parametrize("cn_cn, attr, positive, negative", MATCHING_RULES)
def test_valid_invalid_attributes(topology_st, cn_cn, attr, positive, negative):
@pytest.mark.parametrize("rule", MATCHING_RULES)
def test_valid_invalid_attributes(topology_st, rule):
"""Test valid and invalid values of attributes
:id: 7ec19eca-8cfc-11e9-a0df-8c16451d917b
:setup: Standalone
......@@ -267,13 +334,37 @@ def test_valid_invalid_attributes(topology_st, cn_cn, attr, positive, negative):
3. Fail
"""
cos = CosTemplates(topology_st.standalone, DEFAULT_SUFFIX)
cos.create(properties={'cn': cn_cn,
attr: positive})
cos.create(properties={'cn': 'addentry'+rule['attr'].split('attr')[1],
rule['attr']: rule['positive']})
for entry in cos.list():
entry.delete()
with pytest.raises(ldap.TYPE_OR_VALUE_EXISTS):
cos.create(properties={'cn': cn_cn,
attr: negative})
cos.create(properties={'cn': 'addentry'+rule['attr'].split('attr')[1],
rule['attr']: rule['negative']})
@pytest.mark.parametrize("mode", MATCHING_MODES)
def test_valid_invalid_modes(topology_st, mode):
"""Test valid and invalid values of attributes modes
:id: 7ec19eca-8cfc-11e9-a0df-8c16451d917b
:setup: Standalone
:steps:
1. Create entry with an attribute that uses matching mode
2. Add an attribute that uses that matching mode providing duplicate
values that are duplicates according to the equality matching.
3. Delete existing entry
:expected results:
1. Pass
2. Fail
3. Pass
"""
cos = CosTemplates(topology_st.standalone, DEFAULT_SUFFIX)
cos.create(properties={'cn': 'addentry'+mode['attr'].split('attr')[1],
mode['attr']: mode['positive']})
with pytest.raises(ldap.TYPE_OR_VALUE_EXISTS):
cos.list()[0].add(mode['attr'], mode['negative'])
for entry in cos.list():
entry.delete()
if __name__ == '__main__':
......
......@@ -551,7 +551,7 @@ def test_search_dns_ip_aci(topology_st, create_user, aci_subject):
ACI_BODY = ensure_bytes(ACI_TARGET + ACI_ALLOW + ACI_SUBJECT)
topology_st.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_REPLACE, 'aci', ACI_BODY)])
log.info('Set user bind')
conn = create_user.bind(TEST_USER_PWD)
conn = create_user.bind(TEST_USER_PWD, uri=f'ldap://{IP_ADDRESS}:{topology_st.standalone.port}')
log.info('Create simple paged results control instance')
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
......
......@@ -11,14 +11,17 @@ import socket
import ldap
import pytest
import uuid
import time
from lib389 import DirSrv
from lib389.utils import *
from lib389.tasks import *
from lib389.tools import DirSrvTools
from lib389.topologies import topology_st
from lib389._constants import DEFAULT_SUFFIX, DN_DM, PASSWORD
from lib389.idm.user import UserAccounts, TEST_USER_PROPERTIES
from lib389.idm.directorymanager import DirectoryManager
from lib389.plugins import RootDNAccessControlPlugin
pytestmark = pytest.mark.tier1
logging.getLogger(__name__).setLevel(logging.DEBUG)
......@@ -51,43 +54,32 @@ def rootdn_setup(topology_st):
- Allowed host *
- Denied host *
* means mulitple valued
* means multiple valued
"""
log.info('Initializing root DN test suite...')
global inst
inst = topology_st.standalone
#
# Set an aci so we can modify the plugin after we deny the Root DN
#
ACI = ('(target ="ldap:///cn=config")(targetattr = "*")(version 3.0' +
';acl "all access";allow (all)(userdn="ldap:///anyone");)')
assert inst.config.set('aci', ACI)
#
# Create a user to modify the config
#
users = UserAccounts(inst, DEFAULT_SUFFIX)
TEST_USER_PROPERTIES['userpassword'] = PASSWORD
global user
user = users.create(properties=TEST_USER_PROPERTIES)
#
# Enable dynamic plugins
#
assert inst.config.set('nsslapd-dynamic-plugins', 'on')
topology_st.standalone.config.set('nsslapd-dynamic-plugins', 'on')
#
# Enable the plugin (after enabling dynamic plugins)
#
# Enable the plugin
global plugin
plugin = RootDNAccessControlPlugin(inst)
plugin = RootDNAccessControlPlugin(topology_st.standalone)
plugin.enable()
log.info('test_rootdn_init: Initialized root DN test suite.')
def rootdn_bind(inst, uri=None, fail=False):
"""Helper function to test root DN bind
"""
newinst = DirSrv(verbose=False)
args = {SER_PORT: inst.port,
SER_SERVERID_PROP: inst.serverid}
newinst.allocate(args)
newinst.open(uri=uri, connOnly=True) # This binds as root dn
def test_rootdn_access_specific_time(topology_st, rootdn_setup, rootdn_cleanup):
"""Test binding inside and outside of a specific time
......@@ -108,6 +100,7 @@ def test_rootdn_access_specific_time(topology_st, rootdn_setup, rootdn_cleanup):
"""
log.info('Running test_rootdn_access_specific_time...')
dm = DirectoryManager(topology_st.standalone)
# Get the current time, and bump it ahead twohours
current_hour = time.strftime("%H")
......@@ -120,27 +113,19 @@ def test_rootdn_access_specific_time(topology_st, rootdn_setup, rootdn_cleanup):
assert plugin.replace_many(('rootdn-open-time', open_time),
('rootdn-close-time', close_time))
time.sleep(.5)
#
# Bind as Root DN - should fail
#
with pytest.raises(ldap.UNWILLING_TO_PERFORM):
inst.simple_bind_s(DN_DM, PASSWORD)
dm.bind()
#
# Set config to allow the entire day
#
assert inst.simple_bind_s(user.dn, PASSWORD)
assert plugin.replace_many(('rootdn-open-time', '0000'),
('rootdn-close-time', '2359'))
time.sleep(.5)
dm.bind()
assert inst.simple_bind_s(DN_DM, PASSWORD)
#
# Cleanup - undo the changes we made so the next test has a clean slate
#
assert plugin.apply_mods([(ldap.MOD_DELETE, 'rootdn-open-time'),
(ldap.MOD_DELETE, 'rootdn-close-time')])
......@@ -163,6 +148,7 @@ def test_rootdn_access_day_of_week(topology_st, rootdn_setup, rootdn_cleanup):
"""
log.info('Running test_rootdn_access_day_of_week...')
dm = DirectoryManager(topology_st.standalone)
days = ('Sun', 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat')
day = int(time.strftime("%w", time.gmtime()))
......@@ -182,24 +168,20 @@ def test_rootdn_access_day_of_week(topology_st, rootdn_setup, rootdn_cleanup):
log.info('Allowed days: ' + allow_days)
log.info('Deny days: ' + deny_days)
#
# Set the deny days
#
plugin.set_days_allowed(deny_days)
time.sleep(.5)
#
# Bind as Root DN - should fail
#
with pytest.raises(ldap.UNWILLING_TO_PERFORM):
inst.simple_bind_s(DN_DM, PASSWORD)
dm.bind()
#
# Set the allow days
#
assert inst.simple_bind_s(user.dn, PASSWORD)
plugin.set_days_allowed(allow_days)
assert inst.simple_bind_s(DN_DM, PASSWORD)
time.sleep(.5)
dm.bind()
def test_rootdn_access_denied_ip(topology_st, rootdn_setup, rootdn_cleanup):
......@@ -222,23 +204,19 @@ def test_rootdn_access_denied_ip(topology_st, rootdn_setup, rootdn_cleanup):
log.info('Running test_rootdn_access_denied_ip...')
plugin.add_deny_ip('127.0.0.1')
plugin.add_deny_ip('::1')
time.sleep(.5)
#
# Bind as Root DN - should fail
#
conn = ldap.initialize('ldap://{}:{}'.format('127.0.0.1', inst.port))
uri = 'ldap://{}:{}'.format('127.0.0.1', topology_st.standalone.port)
with pytest.raises(ldap.UNWILLING_TO_PERFORM):
conn.simple_bind_s(DN_DM, PASSWORD)
rootdn_bind(topology_st.standalone, uri=uri)
#
# Change the denied IP so root DN succeeds
#
assert inst.simple_bind_s(user.dn, PASSWORD)
plugin.apply_mods([(ldap.MOD_REPLACE, 'rootdn-deny-ip', '255.255.255.255')])
time.sleep(.5)
conn = ldap.initialize('ldap://{}:{}'.format('127.0.0.1', inst.port))
assert conn.simple_bind_s(DN_DM, PASSWORD)
# Bind should succeed
rootdn_bind(topology_st.standalone, uri=uri)
def test_rootdn_access_denied_host(topology_st, rootdn_setup, rootdn_cleanup):
......@@ -263,22 +241,19 @@ def test_rootdn_access_denied_host(topology_st, rootdn_setup, rootdn_cleanup):
plugin.add_deny_host(hostname)
if localhost != hostname:
plugin.add_deny_host(localhost)
time.sleep(.5)
#
# Bind as Root DN - should fail
#
conn = ldap.initialize('ldap://{}:{}'.format(localhost, inst.port))
uri = 'ldap://{}:{}'.format(localhost, topology_st.standalone.port)
with pytest.raises(ldap.UNWILLING_TO_PERFORM):
conn.simple_bind_s(DN_DM, PASSWORD)
rootdn_bind(topology_st.standalone, uri=uri)
#
# Change the denied host so root DN succeeds
#
assert inst.simple_bind_s(user.dn, PASSWORD)
plugin.apply_mods([(ldap.MOD_REPLACE, 'rootdn-deny-host', 'i.dont.exist.{}'.format(uuid.uuid4()))])
time.sleep(.5)
conn = ldap.initialize('ldap://{}:{}'.format(hostname, inst.port))
assert conn.simple_bind_s(DN_DM, PASSWORD)
# Bind should succeed
rootdn_bind(topology_st.standalone, uri=uri)
def test_rootdn_access_allowed_ip(topology_st, rootdn_setup, rootdn_cleanup):
......@@ -300,27 +275,22 @@ def test_rootdn_access_allowed_ip(topology_st, rootdn_setup, rootdn_cleanup):
log.info('Running test_rootdn_access_allowed_ip...')
#
# Set allowed ip to 255.255.255.255 - blocks the Root DN
#
plugin.add_allow_ip('255.255.255.255')
time.sleep(.5)
#
# Bind as Root DN - should fail
#
conn = ldap.initialize('ldap://{}:{}'.format(localhost, inst.port))
uri = 'ldap://{}:{}'.format(localhost, topology_st.standalone.port)
with pytest.raises(ldap.UNWILLING_TO_PERFORM):
conn.simple_bind_s(DN_DM, PASSWORD)
rootdn_bind(topology_st.standalone, uri=uri)
#
# Allow localhost
#
assert inst.simple_bind_s(user.dn, PASSWORD)
plugin.add_allow_ip('127.0.0.1')
plugin.add_allow_ip('::1')
time.sleep(.5)
conn = ldap.initialize('ldap://{}:{}'.format(localhost, inst.port))
assert conn.simple_bind_s(DN_DM, PASSWORD)
# Bind should succeed
rootdn_bind(topology_st.standalone, uri=uri)
def test_rootdn_access_allowed_host(topology_st, rootdn_setup, rootdn_cleanup):
......@@ -342,29 +312,25 @@ def test_rootdn_access_allowed_host(topology_st, rootdn_setup, rootdn_cleanup):
log.info('Running test_rootdn_access_allowed_host...')
#
# Set allowed host to an unknown host - blocks the Root DN
#
plugin.add_allow_host('i.dont.exist.{}'.format(uuid.uuid4()))
time.sleep(.5)
#
# Bind as Root DN - should fail
#
conn = ldap.initialize('ldap://{}:{}'.format(localhost, inst.port))
uri = 'ldap://{}:{}'.format(localhost, topology_st.standalone.port)
with pytest.raises(ldap.UNWILLING_TO_PERFORM):
conn.simple_bind_s(DN_DM, PASSWORD)
rootdn_bind(topology_st.standalone, uri=uri)
#
# Allow localhost
#
assert inst.simple_bind_s(user.dn, PASSWORD)
plugin.remove_all_allow_host()
plugin.add_allow_host(localhost)
if hostname != localhost:
plugin.add_allow_host(hostname)
time.sleep(.5)
# Bind should succeed
rootdn_bind(topology_st.standalone, uri=uri)
conn = ldap.initialize('ldap://{}:{}'.format(localhost, inst.port))
assert conn.simple_bind_s(DN_DM, PASSWORD)
def test_rootdn_config_validate(topology_st, rootdn_setup, rootdn_cleanup):
"""Test plugin configuration validation
......@@ -421,9 +387,7 @@ def test_rootdn_config_validate(topology_st, rootdn_setup, rootdn_cleanup):
23. Should fail
"""
#
# Test rootdn-open-time
#
# Test invalid values for all settings
with pytest.raises(ldap.UNWILLING_TO_PERFORM):
log.info('Add just "rootdn-open-time"')
plugin.apply_mods([(ldap.MOD_REPLACE, 'rootdn-open-time', '0000')])
......@@ -444,10 +408,7 @@ def test_rootdn_config_validate(topology_st, rootdn_setup, rootdn_cleanup):
plugin.apply_mods([(ldap.MOD_REPLACE, 'rootdn-open-time','aaaaa'),
(ldap.MOD_REPLACE, 'rootdn-close-time', '0000')])
#
# Test rootdn-close-time
#
log.info('Add just "rootdn-close-time"')
plugin.apply_mods([(ldap.MOD_REPLACE, 'rootdn-close-time', '0000')])
......@@ -467,10 +428,7 @@ def test_rootdn_config_validate(topology_st, rootdn_setup, rootdn_cleanup):
plugin.apply_mods([(ldap.MOD_REPLACE, 'rootdn-open-time','0000'),
(ldap.MOD_REPLACE, 'rootdn-close-time','aaaaa')])
#
# Test days allowed
#
log.info('Add multiple "rootdn-days-allowed"')
plugin.apply_mods([(ldap.MOD_ADD, 'rootdn-days-allowed', 'Mon'),
(ldap.MOD_ADD, 'rootdn-days-allowed', 'Tue')])
......@@ -481,31 +439,23 @@ def test_rootdn_config_validate(topology_st, rootdn_setup, rootdn_cleanup):
plugin.apply_mods([(ldap.MOD_REPLACE, 'rootdn-days-allowed', 'm111m')])
plugin.apply_mods([(ldap.MOD_REPLACE, 'rootdn-days-allowed', 'Gur')])
#
# Test allow ips
#
log.info('Add invalid "rootdn-allow-ip"')
plugin.apply_mods([(ldap.MOD_REPLACE, 'rootdn-allow-ip', '12.12.Z.12')])
plugin.apply_mods([(ldap.MOD_REPLACE, 'rootdn-allow-ip', '123.234.345.456')])
plugin.apply_mods([(ldap.MOD_REPLACE, 'rootdn-allow-ip', ':::')])
#
# Test deny ips
#
log.info('Add invalid "rootdn-deny-ip"')
plugin.apply_mods([(ldap.MOD_REPLACE, 'rootdn-deny-ip', '12.12.Z.12')])
plugin.apply_mods([(ldap.MOD_REPLACE, 'rootdn-deny-ip', '123.234.345.456')])
plugin.apply_mods([(ldap.MOD_REPLACE, 'rootdn-deny-ip', ':::')])
#
# Test allow hosts
#
log.info('Add invalid "rootdn-allow-host"')
plugin.apply_mods([(ldap.MOD_REPLACE, 'rootdn-allow-host', 'host._.com')])
#
# Test deny hosts
#
log.info('Add invalid "rootdn-deny-host"')
plugin.apply_mods([(ldap.MOD_REPLACE, 'rootdn-deny-host', 'host.####.com')])
......
......@@ -148,7 +148,7 @@ def test_rename_large_subtree(topology_m2):
assert len(members) == UCOUNT
# Wait for replication
repl.wait_for_replication(st, m2)
repl.wait_for_replication(st, m2, timeout=60)
for i in range(0, 5):
# Move ou=s1 to ou=account as parent. We have to provide the rdn,
......@@ -169,7 +169,7 @@ def test_rename_large_subtree(topology_m2):
assert 'ou=int' in member
# Check everythig on the other side is good.
repl.wait_for_replication(st, m2)
repl.wait_for_replication(st, m2, timeout=60)
group2 = Groups(m2, DEFAULT_SUFFIX).get('default_group')
......
......@@ -472,6 +472,8 @@ def test_invalid_agmt(topo_m4):
except ldap.LDAPError as e:
m1.log.fatal('Failed to bind: ' + str(e))
assert False
def test_warining_for_invalid_replica(topo_m4):
"""Testing logs to indicate the inconsistency when configuration is performed.
......
# --- BEGIN COPYRIGHT BLOCK ---
# Copyright (C) 2018 Red Hat, Inc.
# All rights reserved.
#
# License: GPL (version 3 or any later version).
# See LICENSE for details.
# --- END COPYRIGHT BLOCK ---
#
import logging
import pytest
from lib389.tasks import *
from lib389.topologies import topology_m2 as topo_m2
from lib389.utils import *
from lib389.replica import *
from lib389._constants import *
from lib389.idm.user import UserAccounts
from lib389.idm.domain import Domain
pytestmark = pytest.mark.tier1
log = logging.getLogger(__name__)
@pytest.mark.DS47950
def test_nsslapd_plugin_binddn_tracking(topo_m2):
"""
Testing nsslapd-plugin-binddn-tracking does not cause issues around
access control and reconfiguring replication/repl agmt.
:id: f5ba7b64-fe04-11e8-a298-8c16451d917b
:setup: Replication with two masters.
:steps:
1. Turn on bind dn tracking
2. Add two users
3. Add an aci
4. Make modification as user
5. Setup replica and create a repl agmt
6. Modify replica
7. Modify repl agmt
:expectedresults:
1. Should Success.
2. Should Success.
3. Should Success.
4. Should Success.
5. Should Success.
6. Should Success.
7. Should Success.
"""
log.info("Testing Ticket 47950 - Testing nsslapd-plugin-binddn-tracking")
#
# Turn on bind dn tracking
#
topo_m2.ms["master1"].config.replace("nsslapd-plugin-binddn-tracking", "on")
#
# Add two users
#
users = UserAccounts(topo_m2.ms["master1"], DEFAULT_SUFFIX)
test_user_1 = users.create_test_user(uid=1)
test_user_2 = users.create_test_user(uid=2)
test_user_1.set('userPassword', 'password')
test_user_2.set('userPassword', 'password')
#
# Add an aci
#
USER1_DN = users.list()[0].dn
USER2_DN = users.list()[1].dn
acival = (
'(targetattr ="cn")(version 3.0;acl "Test bind dn tracking"'
+ ';allow (all) (userdn = "ldap:///%s");)' % USER1_DN
)
Domain(topo_m2.ms["master1"], DEFAULT_SUFFIX).add("aci", acival)
#
# Make modification as user
#
assert topo_m2.ms["master1"].simple_bind_s(USER1_DN, "password")
test_user_2.replace("cn", "new value")
#
# Setup replica and create a repl agmt
#
repl = ReplicationManager(DEFAULT_SUFFIX)
assert topo_m2.ms["master1"].simple_bind_s(DN_DM, PASSWORD)
repl.test_replication(topo_m2.ms["master1"], topo_m2.ms["master2"], 30)
repl.test_replication(topo_m2.ms["master2"], topo_m2.ms["master1"], 30)
properties = {
"cn": "test_agreement",
"nsDS5ReplicaRoot": "dc=example,dc=com",
"nsDS5ReplicaHost": "localhost.localdomain",
"nsDS5ReplicaPort": "5555",
"nsDS5ReplicaBindDN": "uid=tester",
"nsds5ReplicaCredentials": "password",
"nsDS5ReplicaTransportInfo": "LDAP",
"nsDS5ReplicaBindMethod": "SIMPLE",
}
replicas = Replicas(topo_m2.ms["master1"])
replica = replicas.get(DEFAULT_SUFFIX)
agmts = Agreements(topo_m2.ms["master1"], basedn=replica.dn)
repl_agreement = agmts.create(properties=properties)
#
# modify replica
#
replica.replace("nsDS5ReplicaId", "7")
assert replica.present("nsDS5ReplicaId", "7")
#
# modify repl agmt
#
repl_agreement.replace('nsDS5ReplicaPort', "8888")
assert repl_agreement.present('nsDS5ReplicaPort', "8888")
if __name__ == "__main__":
# Run isolated
# -s for DEBUG mode
CURRENT_FILE = os.path.realpath(__file__)
pytest.main("-s %s" % CURRENT_FILE)
......@@ -14,13 +14,16 @@ from lib389.utils import *
from lib389.topologies import topology_m2 as topo_m2, TopologyMain, topology_m3 as topo_m3, create_topology, _remove_ssca_db
from lib389._constants import *
from lib389.idm.organizationalunit import OrganizationalUnits
from lib389.agreement import Agreements
from lib389.idm.user import UserAccount
from lib389.idm.group import Groups, Group
from lib389.idm.domain import Domain
from lib389.idm.directorymanager import DirectoryManager
from lib389.replica import Replicas, ReplicationManager
from lib389.agreement import Agreements
from lib389.changelog import Changelog5
from lib389 import pid_from_file
pytestmark = pytest.mark.tier1
NEW_SUFFIX_NAME = 'test_repl'
......@@ -489,9 +492,56 @@ def test_fetch_bindDnGroup(topo_m2):
count = pattern_errorlog(errorlog_M2, regex, start_location=restart_location_M2)
assert(count <= 1)
if DEBUGGING:
# Add debugging steps(if any)...
pass
def test_plugin_bind_dn_tracking_and_replication(topo_m2):
"""Testing nsslapd-plugin-binddn-tracking does not cause issues around
access control and reconfiguring replication/repl agmt.
:id: dd689d03-69b8-4bf9-a06e-2acd19d5e2c9
:setup: 2 master topology
:steps:
1. Turn on plugin binddn tracking
2. Add some users
3. Make an update as a user
4. Make an update to the replica config
5. Make an update to the repliocation agreement
:expectedresults:
1. Success
2. Success
3. Success
4. Success
5. Success
"""
m1 = topo_m2.ms["master1"]
# Turn on bind dn tracking
m1.config.set('nsslapd-plugin-binddn-tracking', 'on')
# Add two users
users = UserAccounts(m1, DEFAULT_SUFFIX)
user1 = users.create_test_user(uid=1011)
user1.set('userpassword', PASSWORD)
user2 = users.create_test_user(uid=1012)
# Add an aci
acival = '(targetattr ="cn")(version 3.0;acl "Test bind dn tracking"' + \
';allow (all) (userdn = "ldap:///{}");)'.format(user1.dn)
Domain(m1, DEFAULT_SUFFIX).add('aci', acival)
# Bind as user and make an update
user1.rebind(PASSWORD)
user2.set('cn', 'new value')
dm = DirectoryManager(m1)
dm.rebind()
# modify replica
replica = Replicas(m1).get(DEFAULT_SUFFIX)
replica.set(REPL_PROTOCOL_TIMEOUT, "30")
# modify repl agmt
agmt = replica.get_agreements().list()[0]
agmt.set(REPL_PROTOCOL_TIMEOUT, "20")
def test_cleanallruv_repl(topo_m3):
......
......@@ -4,11 +4,11 @@ import copy
import os
import ldap
from lib389._constants import *
from lib389 import Entry
from lib389.topologies import topology_st as topo
from lib389.replica import Replicas
from lib389.agreement import Agreements
from lib389.utils import ds_is_older
pytestmark = pytest.mark.tier1
......@@ -104,12 +104,14 @@ def agmt_setup(topo):
def perform_invalid_create(many, properties, attr, value):
my_properties = copy.deepcopy(properties)
my_properties[attr] = value
with pytest.raises(ldap.LDAPError):
with pytest.raises(ldap.LDAPError) as ei:
many.create(properties=my_properties)
return ei.value
def perform_invalid_modify(o, attr, value):
with pytest.raises(ldap.LDAPError):
with pytest.raises(ldap.LDAPError) as ei:
o.replace(attr, value)
return ei.value
@pytest.mark.parametrize("attr, too_small, too_big, overflow, notnum, valid", repl_add_attrs)
def test_replica_num_add(topo, attr, too_small, too_big, overflow, notnum, valid):
......@@ -254,9 +256,26 @@ def test_agmt_num_modify(topo, attr, too_small, too_big, overflow, notnum, valid
# Value is valid
agmt.replace(attr, valid)
@pytest.mark.skipif(ds_is_older('1.4.1.4'), reason="Not implemented")
@pytest.mark.bz1546739
def test_same_attr_yields_same_return_code(topo):
"""Test that various operations with same incorrect attribute value yield same return code
"""
attr = 'nsDS5ReplicaId'
replica_reset(topo)
replicas = Replicas(topo.standalone)
e = perform_invalid_create(replicas, replica_dict, attr, too_big)
assert type(e) is ldap.UNWILLING_TO_PERFORM
replica = replica_setup(topo)
e = perform_invalid_modify(replica, attr, too_big)
assert type(e) is ldap.UNWILLING_TO_PERFORM
if __name__ == '__main__':
# Run isolated
# -s for DEBUG mode
CURRENT_FILE = os.path.realpath(__file__)
pytest.main(["-s", CURRENT_FILE])
# --- BEGIN COPYRIGHT BLOCK ---
# Copyright (C) 2017 Red Hat, Inc.
# All rights reserved.
#
# License: GPL (version 3 or any later version).
# See LICENSE for details.
# --- END COPYRIGHT BLOCK ---
#
import pytest
from lib389.tasks import *
from lib389.utils import *
from lib389.topologies import topology_m1
from lib389.tombstone import Tombstones
from lib389.idm.user import UserAccounts, TEST_USER_PROPERTIES
from lib389.replica import ReplicationManager
from lib389._constants import (defaultProperties, DEFAULT_SUFFIX, ReplicaRole,
REPLICAID_MASTER_1, REPLICA_PRECISE_PURGING, REPLICA_PURGE_DELAY,
REPLICA_PURGE_INTERVAL)
pytestmark = pytest.mark.tier2
def test_precise_tombstone_purging(topology_m1):
""" Test precise tombstone purging
:id: adb86f50-ae76-4ed6-82b4-3cdc30ccab79
:setup: master1 instance
:steps:
1. Create and Delete entry to create a tombstone
2. export ldif, edit, and import ldif
3. Check tombstones do not contain nsTombstoneCSN
4. Run fixup task, and verify tombstones now have nsTombstone CSN
5. Configure tombstone purging
6. Verify tombstones are purged
:expectedresults:
1. Success
2. Success
3. Success
4. Success
5. Success
6. Success
"""
m1 = topology_m1.ms['master1']
m1_tasks = Tasks(m1)
# Create tombstone entry
users = UserAccounts(m1, DEFAULT_SUFFIX)
user = users.create_test_user(uid=1001)
user.delete()
# Verify tombstone was created
tombstones = Tombstones(m1, DEFAULT_SUFFIX)
assert len(tombstones.list()) == 1
# Export db, strip nsTombstoneCSN, and import it
ldif_file = "{}/export.ldif".format(m1.get_ldif_dir())
args = {EXPORT_REPL_INFO: True,
TASK_WAIT: True}
m1_tasks.exportLDIF(DEFAULT_SUFFIX, None, ldif_file, args)
time.sleep(.5)
# Strip LDIF of nsTombstoneCSN, getthe LDIF lines, the n create new ldif
ldif = open(ldif_file, "r")
lines = ldif.readlines()
ldif.close()
time.sleep(.5)
ldif = open(ldif_file, "w")
for line in lines:
if not line.lower().startswith('nstombstonecsn'):
ldif.write(line)
ldif.close()
time.sleep(.5)
# import the new ldif file
log.info('Import replication LDIF file...')
args = {TASK_WAIT: True}
m1_tasks.importLDIF(DEFAULT_SUFFIX, None, ldif_file, args)
time.sleep(.5)
# Search for the tombstone again
tombstones = Tombstones(m1, DEFAULT_SUFFIX)
assert len(tombstones.list()) == 1
#
# Part 3 - test fixup task using the strip option.
#
args = {TASK_WAIT: True,
TASK_TOMB_STRIP: True}
m1_tasks.fixupTombstones(DEFAULT_BENAME, args)
time.sleep(.5)
# Search for tombstones with nsTombstoneCSN - better not find any
for ts in tombstones.list():
assert not ts.present("nsTombstoneCSN")
# Now run the fixup task
args = {TASK_WAIT: True}
m1_tasks.fixupTombstones(DEFAULT_BENAME, args)
time.sleep(.5)
# Search for tombstones with nsTombstoneCSN - better find some
tombstones = Tombstones(m1, DEFAULT_SUFFIX)
assert len(tombstones.list()) == 1
#
# Part 4 - Test tombstone purging
#
args = {REPLICA_PRECISE_PURGING: b'on',
REPLICA_PURGE_DELAY: b'5',
REPLICA_PURGE_INTERVAL: b'5'}
m1.replica.setProperties(DEFAULT_SUFFIX, None, None, args)
# Wait for the interval to pass
log.info('Wait for tombstone purge interval to pass...')
time.sleep(6)
# Add an entry to trigger replication
users.create_test_user(uid=1002)
# Wait for the interval to pass again
log.info('Wait for tombstone purge interval to pass again...')
time.sleep(6)
# search for tombstones, there should be none
tombstones = Tombstones(m1, DEFAULT_SUFFIX)
assert len(tombstones.list()) == 0
......@@ -10,12 +10,12 @@ import pytest
from lib389.tasks import *
from lib389.utils import *
from lib389.topologies import topology_m1
from lib389.tombstone import Tombstones
from lib389.idm.user import UserAccounts, TEST_USER_PROPERTIES
pytestmark = pytest.mark.tier1
def test_purge_success(topology_m1):
"""Verify that tombstones are created successfully
......@@ -55,6 +55,7 @@ def test_purge_success(topology_m1):
assert len(users.list()) == 1
user_revived = users.get('testuser')
if __name__ == '__main__':
# Run isolated
# -s for DEBUG mode
......
......@@ -2,9 +2,11 @@ import logging
import pytest
import os
import ldap
import resource
from lib389._constants import *
from lib389.topologies import topology_st
from lib389.utils import ds_is_older
from lib389.utils import ds_is_older, ensure_str
from subprocess import check_output
pytestmark = pytest.mark.tier1
......@@ -12,9 +14,11 @@ logging.getLogger(__name__).setLevel(logging.INFO)
log = logging.getLogger(__name__)
FD_ATTR = "nsslapd-maxdescriptors"
SYSTEMD_VAL = "16384"
CUSTOM_VAL = "9000"
TOO_HIGH_VAL = "65536"
GLOBAL_LIMIT = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
SYSTEMD_LIMIT = ensure_str(check_output("systemctl show --value -p LimitNOFILE dirsrv@standalone1".split(" ")).strip())
CUSTOM_VAL = str(int(SYSTEMD_LIMIT) - 10)
TOO_HIGH_VAL = str(GLOBAL_LIMIT * 2)
TOO_HIGH_VAL2 = str(int(SYSTEMD_LIMIT) * 2)
TOO_LOW_VAL = "0"
@pytest.mark.skipif(ds_is_older("1.4.1.2"), reason="Not implemented")
......@@ -26,7 +30,7 @@ def test_fd_limits(topology_st):
:steps:
1. Check default limit
2. Change default limit
3. Check invalid/too high limit is rejected
3. Check invalid/too high limits are rejected
4. Check invalid/too low limit is rejected
:expectedresults:
1. Success
......@@ -37,19 +41,25 @@ def test_fd_limits(topology_st):
# Check systemd default
max_fd = topology_st.standalone.config.get_attr_val_utf8(FD_ATTR)
assert max_fd == SYSTEMD_VAL
assert max_fd == SYSTEMD_LIMIT
# Check custom value is applied
topology_st.standalone.config.set(FD_ATTR, CUSTOM_VAL)
max_fd = topology_st.standalone.config.get_attr_val_utf8(FD_ATTR)
assert max_fd == CUSTOM_VAL
# Attempt to use val that is too high
# # Attempt to use value that is higher than the global system limit
with pytest.raises(ldap.UNWILLING_TO_PERFORM):
topology_st.standalone.config.set(FD_ATTR, TOO_HIGH_VAL)
max_fd = topology_st.standalone.config.get_attr_val_utf8(FD_ATTR)
assert max_fd == CUSTOM_VAL
# Attempt to use value that is higher than the value defined in the systemd service
with pytest.raises(ldap.UNWILLING_TO_PERFORM):
topology_st.standalone.config.set(FD_ATTR, TOO_HIGH_VAL2)
max_fd = topology_st.standalone.config.get_attr_val_utf8(FD_ATTR)
assert max_fd == CUSTOM_VAL
# Attempt to use val that is too low
with pytest.raises(ldap.OPERATIONS_ERROR):
topology_st.standalone.config.set(FD_ATTR, TOO_LOW_VAL)
......
# --- BEGIN COPYRIGHT BLOCK ---
# Copyright (C) 2016 Red Hat, Inc.
# All rights reserved.
#
# License: GPL (version 3 or any later version).
# See LICENSE for details.
# --- END COPYRIGHT BLOCK ---
#
import logging
import pytest
from lib389.tasks import *
from lib389.topologies import topology_st
from lib389.replica import ReplicationManager
from lib389._constants import (defaultProperties, DEFAULT_SUFFIX, ReplicaRole,
REPLICAID_MASTER_1, REPLICA_PRECISE_PURGING, REPLICA_PURGE_DELAY,
REPLICA_PURGE_INTERVAL)
pytestmark = pytest.mark.tier2
log = logging.getLogger(__name__)
def test_ticket47819(topology_st):
"""
from lib389.utils import *
# Skip on older versions
pytestmark = pytest.mark.skipif(ds_is_older('1.3.4'), reason="Not implemented")
Testing precise tombstone purging:
[1] Make sure "nsTombstoneCSN" is added to new tombstones
[2] Make sure an import of a replication ldif adds "nsTombstoneCSN"
to old tombstones
[4] Test fixup task
[3] Make sure tombstone purging works
"""
log.info('Testing Ticket 47819 - Test precise tombstone purging')
#
# Setup Replication
#
master = topology_st.standalone
repl = ReplicationManager(DEFAULT_SUFFIX)
repl.create_first_master(master)
repl.ensure_agreement(master, master)
#
# Part 1 create a tombstone entry and make sure nsTombstoneCSN is added
#
log.info('Part 1: Add and then delete an entry to create a tombstone...')
try:
topology_st.standalone.add_s(Entry(('cn=entry1,dc=example,dc=com', {
'objectclass': 'top person'.split(),
'sn': 'user',
'cn': 'entry1'})))
except ldap.LDAPError as e:
log.error('Failed to add entry: ' + e.message['desc'])
assert False
try:
topology_st.standalone.delete_s('cn=entry1,dc=example,dc=com')
except ldap.LDAPError as e:
log.error('Failed to delete entry: ' + e.message['desc'])
assert False
log.info('Search for tombstone entries...')
try:
entries = topology_st.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE,
'(&(nsTombstoneCSN=*)(objectclass=nsTombstone))')
if not entries:
log.fatal('Search failed to the new tombstone(nsTombstoneCSN is probably missing).')
assert False
except ldap.LDAPError as e:
log.fatal('Search failed: ' + e.message['desc'])
assert False
log.info('Part 1 - passed')
#
# Part 2 - import ldif with tombstones missing 'nsTombstoneCSN'
#
# First, export the replication ldif, edit the file(remove nstombstonecsn),
# and reimport it.
#
log.info('Part 2: Exporting replication ldif...')
# Get the the full path and name for our LDIF we will be exporting
ldif_file = "/tmp/export.ldif"
args = {EXPORT_REPL_INFO: True,
TASK_WAIT: True}
exportTask = Tasks(topology_st.standalone)
try:
exportTask.exportLDIF(DEFAULT_SUFFIX, None, ldif_file, args)
except ValueError:
assert False
time.sleep(1)
# open the ldif file, get the lines, then rewrite the file
ldif = open(ldif_file, "r")
lines = ldif.readlines()
ldif.close()
time.sleep(1)
ldif = open(ldif_file, "w")
for line in lines:
if not line.lower().startswith('nstombstonecsn'):
ldif.write(line)
ldif.close()
time.sleep(1)
# import the new ldif file
log.info('Import replication LDIF file...')
importTask = Tasks(topology_st.standalone)
args = {TASK_WAIT: True}
try:
importTask.importLDIF(DEFAULT_SUFFIX, None, ldif_file, args)
os.remove(ldif_file)
except ValueError:
os.remove(ldif_file)
assert False
time.sleep(1)
# Search for the tombstone again
log.info('Search for tombstone entries...')
try:
entries = topology_st.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE,
'(&(nsTombstoneCSN=*)(objectclass=nsTombstone))')
if not entries:
log.fatal('Search failed to fine the new tombstone(nsTombstoneCSN is probably missing).')
assert False
except ldap.LDAPError as e:
log.fatal('Search failed: ' + e.message['desc'])
assert False
log.info('Part 2 - passed')
#
# Part 3 - test fixup task
#
log.info('Part 3: test the fixup task')
# Run fixup task using the strip option. This removes nsTombstoneCSN
# so we can test if the fixup task works.
args = {TASK_WAIT: True,
TASK_TOMB_STRIP: True}
fixupTombTask = Tasks(topology_st.standalone)
try:
fixupTombTask.fixupTombstones(DEFAULT_BENAME, args)
except:
assert False
time.sleep(1)
# Search for tombstones with nsTombstoneCSN - better not find any
log.info('Search for tombstone entries...')
try:
entries = topology_st.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE,
'(&(nsTombstoneCSN=*)(objectclass=nsTombstone))')
if entries:
log.fatal('Search found tombstones with nsTombstoneCSN')
assert False
except ldap.LDAPError as e:
log.fatal('Search failed: ' + e.message['desc'])
assert False
# Now run the fixup task
args = {TASK_WAIT: True}
fixupTombTask = Tasks(topology_st.standalone)
try:
fixupTombTask.fixupTombstones(DEFAULT_BENAME, args)
except:
assert False
time.sleep(1)
# Search for tombstones with nsTombstoneCSN - better find some
log.info('Search for tombstone entries...')
try:
entries = topology_st.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE,
'(&(nsTombstoneCSN=*)(objectclass=nsTombstone))')
if not entries:
log.fatal('Search did not find any fixed-up tombstones')
assert False
except ldap.LDAPError as e:
log.fatal('Search failed: ' + e.message['desc'])
assert False
log.info('Part 3 - passed')
#
# Part 4 - Test tombstone purging
#
log.info('Part 4: test tombstone purging...')
args = {REPLICA_PRECISE_PURGING: b'on',
REPLICA_PURGE_DELAY: b'5',
REPLICA_PURGE_INTERVAL: b'5'}
try:
topology_st.standalone.replica.setProperties(DEFAULT_SUFFIX, None, None, args)
except:
log.fatal('Failed to configure replica')
assert False
# Wait for the interval to pass
log.info('Wait for tombstone purge interval to pass...')
time.sleep(10)
# Add an entry to trigger replication
log.info('Perform an update to help trigger tombstone purging...')
try:
topology_st.standalone.add_s(Entry(('cn=test_entry,dc=example,dc=com', {
'objectclass': 'top person'.split(),
'sn': 'user',
'cn': 'entry1'})))
except ldap.LDAPError as e:
log.error('Failed to add entry: ' + e.message['desc'])
assert False
# Wait for the interval to pass again
log.info('Wait for tombstone purge interval to pass again...')
time.sleep(10)
# search for tombstones, there should be none
log.info('Search for tombstone entries...')
try:
entries = topology_st.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE,
'(&(nsTombstoneCSN=*)(objectclass=nsTombstone))')
if entries:
log.fatal('Search unexpectedly found tombstones')
assert False
except ldap.LDAPError as e:
log.fatal('Search failed: ' + e.message['desc'])
assert False
log.info('Part 4 - passed')
if __name__ == '__main__':
# Run isolated
# -s for DEBUG mode
CURRENT_FILE = os.path.realpath(__file__)
pytest.main("-s %s" % CURRENT_FILE)
This diff is collapsed.