Skip to content
Commits on Source (29)
......@@ -2150,7 +2150,6 @@ TESTS = test_slapd \
test_slapd_SOURCES = test/main.c \
test/libslapd/test.c \
test/libslapd/counters/atomic.c \
test/libslapd/filter/optimise.c \
test/libslapd/pblock/analytics.c \
test/libslapd/pblock/v3_compat.c \
test/libslapd/operation/v3_compat.c \
......
......@@ -10,7 +10,7 @@ vendor="389 Project"
# PACKAGE_VERSION is constructed from these
VERSION_MAJOR=1
VERSION_MINOR=4
VERSION_MAINT=0.15
VERSION_MAINT=0.18
# NOTE: VERSION_PREREL is automatically set for builds made out of a git tree
VERSION_PREREL=
VERSION_DATE=$(date -u +%Y%m%d)
......
......@@ -135,7 +135,7 @@ AC_MSG_CHECKING(for --enable-asan)
AC_ARG_ENABLE(asan, AS_HELP_STRING([--enable-asan], [Enable gcc/clang address sanitizer options (default: no)]),
[
AC_MSG_RESULT(yes)
asan_cflags="-fsanitize=address -fno-omit-frame-pointer"
asan_cflags="-fsanitize=address -fno-omit-frame-pointer -lasan"
asan_rust_defs="-Z sanitizer=address"
],
[
......
# --- BEGIN COPYRIGHT BLOCK ---
# Copyright (C) 2016 Red Hat, Inc.
# All rights reserved.
#
# License: GPL (version 3 or any later version).
# See LICENSE for details.
# --- END COPYRIGHT BLOCK ---
#
import logging
import pytest
from lib389.tasks import *
from lib389.topologies import topology_st as topo
from lib389.utils import *
from lib389._constants import DEFAULT_SUFFIX
from lib389.idm.user import UserAccounts, TEST_USER_PROPERTIES
from lib389.backend import Backends
from lib389.idm.domain import Domain
USER_DN = 'uid=test_user,%s' % DEFAULT_SUFFIX
logging.getLogger(__name__).setLevel(logging.INFO)
log = logging.getLogger(__name__)
@pytest.fixture(scope="module")
def enable_user_attr_encryption(topo, request):
""" Enables attribute encryption for various attributes
Adds a test user with encrypted attributes
"""
log.info("Enable TLS for attribute encryption")
topo.standalone.enable_tls()
log.info("Enables attribute encryption")
backends = Backends(topo.standalone)
backend = backends.list()[0]
encrypt_attrs = backend.get_encrypted_attrs()
log.info("Enables attribute encryption for employeeNumber and telephoneNumber")
emp_num_encrypt = encrypt_attrs.create(properties={'cn': 'employeeNumber', 'nsEncryptionAlgorithm': 'AES'})
telephone_encrypt = encrypt_attrs.create(properties={'cn': 'telephoneNumber', 'nsEncryptionAlgorithm': '3DES'})
log.info("Add a test user with encrypted attributes")
users = UserAccounts(topo.standalone, DEFAULT_SUFFIX)
test_user = users.create(properties=TEST_USER_PROPERTIES)
test_user.replace('employeeNumber', '1000')
test_user.replace('telephonenumber', '1234567890')
def fin():
log.info("Remove attribute encryption for various attributes")
emp_num_encrypt.delete()
telephone_encrypt.delete()
request.addfinalizer(fin)
return test_user
def test_basic(topo, enable_user_attr_encryption):
"""Tests encrypted attributes with a test user entry
:id: d767d5c8-b934-4b14-9774-bd13480d81b3
:setup: Standalone instance
Enable AES encryption config on employeenumber
Enable 3DES encryption config on telephoneNumber
Add a test user with with encrypted attributes
:steps:
1. Restart the server
2. Check employeenumber encryption enabled
3. Check telephoneNumber encryption enabled
4. Check that encrypted attribute is present for user i.e. telephonenumber
:expectedresults:
1. This should be successful
2. This should be successful
3. This should be successful
4. This should be successful
"""
log.info("Restart the server")
topo.standalone.restart()
backends = Backends(topo.standalone)
backend = backends.list()[0]
encrypt_attrs = backend.get_encrypted_attrs()
log.info("Extracting values of cn from the list of objects in encrypt_attrs")
log.info("And appending the cn values in a list")
enc_attrs_cns = []
for enc_attr in encrypt_attrs.list():
enc_attrs_cns.append(enc_attr.rdn)
log.info("Check employeenumber encryption is enabled")
assert "employeeNumber" in enc_attrs_cns
log.info("Check telephoneNumber encryption is enabled")
assert "telephoneNumber" in enc_attrs_cns
log.info("Check that encrypted attribute is present for user i.e. telephonenumber")
assert enable_user_attr_encryption.present('telephoneNumber')
def test_export_import_ciphertext(topo, enable_user_attr_encryption):
"""Configure attribute encryption, store some data, check that we can export the ciphertext
:id: b433e215-2926-48a5-818f-c21abc40fc2d
:setup: Standalone instance
Enable AES encryption config on employeenumber
Enable 3DES encryption config on telephoneNumber
Add a test user with encrypted attributes
:steps:
1. Export data as ciphertext
2. Check that the attribute is present in the exported file
3. Check that the encrypted value of attribute is not present in the exported file
4. Delete the test user entry with encrypted data
5. Import the previously exported data as ciphertext
6. Check attribute telephoneNumber should be imported
:expectedresults:
1. This should be successful
2. This should be successful
3. This should be successful
4. This should be successful
5. This should be successful
6. This should be successful
"""
log.info("Export data as ciphertext")
export_ldif = os.path.join(topo.standalone.ds_paths.ldif_dir, "export_ciphertext.ldif")
# Offline export
topo.standalone.stop()
if not topo.standalone.db2ldif(bename=DEFAULT_BENAME, suffixes=(DEFAULT_SUFFIX,),
excludeSuffixes=None, encrypt=False, repl_data=None, outputfile=export_ldif):
log.fatal('Failed to run offline db2ldif')
assert False
topo.standalone.start()
log.info("Check that the attribute is present in the exported file")
log.info("Check that the encrypted value of attribute is not present in the exported file")
with open(export_ldif, 'r') as ldif_file:
ldif = ldif_file.read()
assert 'telephonenumber' in ldif
assert 'telephonenumber: 1234567890' not in ldif
log.info("Delete the test user entry with encrypted data")
enable_user_attr_encryption.delete()
log.info("Import data as ciphertext, which was exported previously")
import_ldif = os.path.join(topo.standalone.ds_paths.ldif_dir, "export_ciphertext.ldif")
# Offline export
topo.standalone.stop()
if not topo.standalone.ldif2db(bename=DEFAULT_BENAME, suffixes=(DEFAULT_SUFFIX,),
excludeSuffixes=None, encrypt=False, import_file=import_ldif):
log.fatal('Failed to run offline ldif2db')
assert False
topo.standalone.start()
log.info("Check that the data with encrypted attribute is imported properly")
users = UserAccounts(topo.standalone, DEFAULT_SUFFIX)
user = users.get('testuser')
assert user.present("telephoneNumber")
def test_export_import_plaintext(topo, enable_user_attr_encryption):
"""Configure attribute encryption, store some data, check that we can export the plain text
:id: b171e215-0456-48a5-245f-c21abc40fc2d
:setup: Standalone instance
Enable AES encryption config on employeenumber
Enable 3DES encryption config on telephoneNumber
Add a test user with encrypted attributes
:steps:
1. Export data as plain text
2. Check that the attribute is present in the exported file
3. Check that the encrypted value of attribute is also present in the exported file
4. Delete the test user entry with encrypted data
5. Import data as plaintext
6. Check attribute value of telephoneNumber
:expectedresults:
1. This should be successful
2. This should be successful
3. This should be successful
4. This should be successful
5. This should be successful
6. This should be successful
"""
log.info("Export data as plain text")
export_ldif = os.path.join(topo.standalone.ds_paths.ldif_dir, "export_plaintext.ldif")
# Offline export
topo.standalone.stop()
if not topo.standalone.db2ldif(bename=DEFAULT_BENAME, suffixes=(DEFAULT_SUFFIX,),
excludeSuffixes=None, encrypt=True, repl_data=None, outputfile=export_ldif):
log.fatal('Failed to run offline db2ldif')
assert False
topo.standalone.start()
log.info("Check that the attribute is present in the exported file")
log.info("Check that the plain text value of the encrypted attribute is present in the exported file")
with open(export_ldif, 'r') as ldif_file:
assert 'telephoneNumber: 1234567890' in ldif_file.read()
log.info("Delete the test user entry with encrypted data")
enable_user_attr_encryption.delete()
log.info("Import data as plain text, which was exported previously")
import_ldif = os.path.join(topo.standalone.ds_paths.ldif_dir, "export_plaintext.ldif")
# Offline export
topo.standalone.stop()
if not topo.standalone.ldif2db(bename=DEFAULT_BENAME, suffixes=(DEFAULT_SUFFIX,),
excludeSuffixes=None, encrypt=True, import_file=import_ldif):
log.fatal('Failed to run offline ldif2db')
assert False
topo.standalone.start()
log.info("Check that the attribute is imported properly")
users = UserAccounts(topo.standalone, DEFAULT_SUFFIX)
user = users.get('testuser')
assert user.present("telephoneNumber")
def test_attr_encryption_unindexed(topo, enable_user_attr_encryption):
"""Configure attribute encryption for an un-indexed attribute, check that we can export encrypted data
:id: d3ef38e1-bb5a-44d8-a3a4-4a25a57e3454
:setup: Standalone instance
Enable AES encryption config on employeenumber
Enable 3DES encryption config on telephoneNumber
Add a test user with encrypted attributes
:steps:
1. Export data as cipher text
2. Check that the unindexed attribute employeenumber is present in exported ldif file
3. Check that the unindexed attribute employeenumber value is not present in exported ldif file
:expectedresults:
1. This should be successful
2. This should be successful
3. This should be successful
"""
log.info("Export data as cipher text")
export_ldif = os.path.join(topo.standalone.ds_paths.ldif_dir, "emp_num_ciphertext.ldif")
# Offline export
topo.standalone.stop()
if not topo.standalone.db2ldif(bename=DEFAULT_BENAME, suffixes=(DEFAULT_SUFFIX,),
excludeSuffixes=None, encrypt=False, repl_data=None, outputfile=export_ldif):
log.fatal('Failed to run offline db2ldif')
assert False
topo.standalone.start()
log.info("Check that the attribute is present in the exported file")
log.info("Check that the encrypted value of attribute is not present in the exported file")
with open(export_ldif, 'r') as ldif_file:
ldif = ldif_file.read()
assert 'employeeNumber' in ldif
assert 'employeeNumber: 1000' not in ldif
def test_attr_encryption_multiple_backends(topo, enable_user_attr_encryption):
"""Tests Configuration of attribute encryption for multiple backends
Where both the backends have attribute encryption
:id: f3ef40e1-17d6-44d8-a3a4-4a25a57e9064
:setup: Standalone instance
SSL Enabled
:steps:
1. Add two test backends
2. Configure attribute encryption for telephonenumber in one test backend
3. Configure attribute encryption for employeenumber in another test backend
4. Add a test user in both backends with encrypted attributes
5. Export data as ciphertext from both backends
6. Check that telephoneNumber is encrypted in the ldif file of db1
7. Check that employeeNumber is encrypted in the ldif file of db2
8. Delete both test backends
:expectedresults:
1. This should be successful
2. This should be successful
3. This should be successful
4. This should be successful
5. This should be successful
6. This should be successful
7. This should be successful
8. This should be successful
"""
log.info("Add two test backends")
test_suffix1 = 'dc=test1,dc=com'
test_db1 = 'test_db1'
test_suffix2 = 'dc=test2,dc=com'
test_db2 = 'test_db2'
# Create backends
backends = Backends(topo.standalone)
test_backend1 = backends.create(properties={'cn': test_db1,
'nsslapd-suffix': test_suffix1})
test_backend2 = backends.create(properties={'cn': test_db2,
'nsslapd-suffix': test_suffix2})
# Create the top of the tree
suffix1 = Domain(topo.standalone, test_suffix1)
test1 = suffix1.create(properties={'dc': 'test1'})
suffix2 = Domain(topo.standalone, test_suffix2)
test2 = suffix2.create(properties={'dc': 'test2'})
log.info("Enables attribute encryption for telephoneNumber in test_backend1")
backend1_encrypt_attrs = test_backend1.get_encrypted_attrs()
b1_encrypt = backend1_encrypt_attrs.create(properties={'cn': 'telephoneNumber',
'nsEncryptionAlgorithm': 'AES'})
log.info("Enables attribute encryption for employeeNumber in test_backend2")
backend2_encrypt_attrs = test_backend2.get_encrypted_attrs()
b2_encrypt = backend2_encrypt_attrs.create(properties={'cn': 'employeeNumber',
'nsEncryptionAlgorithm': 'AES'})
log.info("Add a test user with encrypted attributes in both backends")
users = UserAccounts(topo.standalone, test1.dn, None)
test_user = users.create(properties=TEST_USER_PROPERTIES)
test_user.replace('telephoneNumber', '1234567890')
users = UserAccounts(topo.standalone, test2.dn, None)
test_user = users.create(properties=TEST_USER_PROPERTIES)
test_user.replace('employeeNumber', '1000')
log.info("Export data as ciphertext from both backends")
export_db1 = os.path.join(topo.standalone.ds_paths.ldif_dir, "export_db1.ldif")
export_db2 = os.path.join(topo.standalone.ds_paths.ldif_dir, "export_db2.ldif")
# Offline export
topo.standalone.stop()
if not topo.standalone.db2ldif(bename=test_db1, suffixes=(test_suffix1,),
excludeSuffixes=None, encrypt=False, repl_data=None, outputfile=export_db1):
log.fatal('Failed to run offline db2ldif')
assert False
if not topo.standalone.db2ldif(bename=test_db2, suffixes=(test_suffix2,),
excludeSuffixes=None, encrypt=False, repl_data=None, outputfile=export_db2):
log.fatal('Failed to run offline db2ldif')
assert False
topo.standalone.start()
log.info("Check that the attribute is present in the exported file in db1")
log.info("Check that the encrypted value of attribute is not present in the exported file in db1")
with open(export_db1, 'r') as ldif_file:
ldif = ldif_file.read()
assert 'telephoneNumber' in ldif
assert 'telephoneNumber: 1234567890' not in ldif
log.info("Check that the attribute is present in the exported file in db2")
log.info("Check that the encrypted value of attribute is not present in the exported file in db2")
with open(export_db2, 'r') as ldif_file:
ldif = ldif_file.read()
assert 'employeeNumber' in ldif
assert 'employeeNumber: 1000' not in ldif
log.info("Delete test backends")
test_backend1.delete()
test_backend2.delete()
def test_attr_encryption_backends(topo, enable_user_attr_encryption):
"""Tests Configuration of attribute encryption for single backend
where more backends are present
:id: f3ef40e1-17d6-44d8-a3a4-4a25a57e9064
:setup: Standalone instance
SSL Enabled
:steps:
1. Add two test backends
2. Configure attribute encryption for telephoneNumber in one test backend
3. Add a test user in both backends with telephoneNumber
4. Export ldif from both test backends
5. Check that telephonenumber is encrypted in the ldif file of db1
6. Check that telephonenumber is not encrypted in the ldif file of db2
7. Delete both test backends
:expectedresults:
1. This should be successful
2. This should be successful
3. This should be successful
4. This should be successful
5. This should be successful
6. This should be successful
7. This should be successful
"""
log.info("Add two test backends")
test_suffix1 = 'dc=test1,dc=com'
test_db1 = 'test_db1'
test_suffix2 = 'dc=test2,dc=com'
test_db2 = 'test_db2'
# Create backends
backends = Backends(topo.standalone)
test_backend1 = backends.create(properties={'cn': test_db1,
'nsslapd-suffix': test_suffix1})
test_backend2 = backends.create(properties={'cn': test_db2,
'nsslapd-suffix': test_suffix2})
# Create the top of the tree
suffix1 = Domain(topo.standalone, test_suffix1)
test1 = suffix1.create(properties={'dc': 'test1'})
suffix2 = Domain(topo.standalone, test_suffix2)
test2 = suffix2.create(properties={'dc': 'test2'})
log.info("Enables attribute encryption for telephoneNumber in test_backend1")
backend1_encrypt_attrs = test_backend1.get_encrypted_attrs()
b1_encrypt = backend1_encrypt_attrs.create(properties={'cn': 'telephoneNumber',
'nsEncryptionAlgorithm': 'AES'})
log.info("Add a test user with telephoneNumber in both backends")
users = UserAccounts(topo.standalone, test1.dn, None)
test_user = users.create(properties=TEST_USER_PROPERTIES)
test_user.replace('telephoneNumber', '1234567890')
users = UserAccounts(topo.standalone, test2.dn, None)
test_user = users.create(properties=TEST_USER_PROPERTIES)
test_user.replace('telephoneNumber', '1234567890')
log.info("Export data as ciphertext from both backends")
export_db1 = os.path.join(topo.standalone.ds_paths.ldif_dir, "export_db1.ldif")
export_db2 = os.path.join(topo.standalone.ds_paths.ldif_dir, "export_db2.ldif")
# Offline export
topo.standalone.stop()
if not topo.standalone.db2ldif(bename=test_db1, suffixes=(test_suffix1,),
excludeSuffixes=None, encrypt=False, repl_data=None, outputfile=export_db1):
log.fatal('Failed to run offline db2ldif')
assert False
if not topo.standalone.db2ldif(bename=test_db2, suffixes=(test_suffix2,),
excludeSuffixes=None, encrypt=False, repl_data=None, outputfile=export_db2):
log.fatal('Failed to run offline db2ldif')
assert False
topo.standalone.start()
log.info("Check that the attribute is present in the exported file in db1")
log.info("Check that the encrypted value of attribute is not present in the exported file in db1")
with open(export_db1, 'r') as ldif_file:
ldif = ldif_file.read()
assert 'telephoneNumber' in ldif
assert 'telephoneNumber: 1234567890' not in ldif
log.info("Check that the attribute is present in the exported file in db2")
log.info("Check that the value of attribute is also present in the exported file in db2")
with open(export_db2, 'r') as ldif_file:
ldif = ldif_file.read()
assert 'telephoneNumber' in ldif
assert 'telephoneNumber: 1234567890' in ldif
log.info("Delete test backends")
test_backend1.delete()
test_backend2.delete()
if __name__ == '__main__':
# Run isolated
# -s for DEBUG mode
CURRENT_FILE = os.path.realpath(__file__)
pytest.main("-s %s" % CURRENT_FILE)
......@@ -889,7 +889,7 @@ adds nsslapd-return-default-opattr attr with value of one operation attribute.
@pytest.fixture(scope="module")
def test_users(topology_st):
def create_users(topology_st):
"""Add users to the default suffix
"""
......@@ -910,7 +910,7 @@ def test_users(topology_st):
})
def test_basic_anonymous_search(topology_st, test_users):
def test_basic_anonymous_search(topology_st, create_users):
"""Tests basic anonymous search operations
:id: c7831e04-f458-4e50-83c7-b6f77109f639
......@@ -931,7 +931,7 @@ def test_basic_anonymous_search(topology_st, test_users):
@pytest.mark.ds604
@pytest.mark.bz915801
def test_search_original_type(topology_st, test_users):
def test_search_original_type(topology_st, create_users):
"""Test ldapsearch returning original attributes
using nsslapd-search-return-original-type-switch
......@@ -1016,6 +1016,78 @@ def test_connection_buffer_size(topology_st):
with pytest.raises(ldap.OPERATIONS_ERROR):
topology_st.standalone.config.replace('nsslapd-connection-buffer', value)
@pytest.mark.bz1637439
def test_critical_msg_on_empty_range_idl(topology_st):
"""Doing a range index lookup should not report a critical message even if IDL is empty
:id: a07a2222-0551-44a6-b113-401d23799364
:setup: Standalone instance
:steps:
1. Create an index for internationalISDNNumber. (attribute chosen because it is
unlikely that previous tests used it)
2. telephoneNumber being indexed by default create 20 users without telephoneNumber
3. add a telephoneNumber value and delete it to trigger an empty index database
4. Do a search that triggers a range lookup on empty telephoneNumber
5. Check that the critical message is not logged in error logs
:expectedresults:
1. This should pass
2. This should pass
3. This should pass
4. This should pass on normal build but could abort a debug build
4. This should pass
"""
indexedAttr = 'internationalISDNNumber'
# Step 1
from lib389.index import Indexes
indexes = Indexes(topology_st.standalone)
indexes.create(properties={
'cn': indexedAttr,
'nsSystemIndex': 'false',
'nsIndexType': 'eq'
})
topology_st.standalone.restart()
# Step 2
users = UserAccounts(topology_st.standalone, DEFAULT_SUFFIX)
log.info('Adding 20 users without "%s"' % indexedAttr)
for i in range(20):
name = 'user_%d' % i
last_user = users.create(properties={
'uid': name,
'sn': name,
'cn': name,
'uidNumber': '1000',
'gidNumber': '1000',
'homeDirectory': '/home/%s' % name,
'mail': '%s@example.com' % name,
'userpassword': 'pass%s' % name,
})
# Step 3
# required update to create the indexAttr (i.e. 'loginShell') database, and then make it empty
topology_st.standalone.modify_s(last_user.dn, [(ldap.MOD_ADD, indexedAttr, b'1234')])
ent = topology_st.standalone.getEntry(last_user.dn, ldap.SCOPE_BASE,)
assert ent
assert ent.hasAttr(indexedAttr)
topology_st.standalone.modify_s(last_user.dn, [(ldap.MOD_DELETE, indexedAttr, None)])
ent = topology_st.standalone.getEntry(last_user.dn, ldap.SCOPE_BASE,)
assert ent
assert not ent.hasAttr(indexedAttr)
# Step 4
# The first component being not indexed the range on second is evaluated
try:
ents = topology_st.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, '(&(sudoNotAfter=*)(%s>=111))' % indexedAttr)
assert len(ents) == 0
except ldap.SERVER_DOWN:
log.error('Likely testing against a debug version that asserted')
pass
# Step 5
assert not topology_st.standalone.searchErrorsLog('CRIT - list_candidates - NULL idl was recieved from filter_candidates_ext.')
if __name__ == '__main__':
# Run isolated
# -s for DEBUG mode
......
......@@ -223,6 +223,88 @@ def test_filter_with_attribute_subtype(topology_st):
log.info('Testcase PASSED')
@pytest.mark.bz1615155
def test_extended_search(topology_st):
"""Test we can search with equality extended matching rule
:id:
:setup: Standalone instance
:steps:
1. Add a test user with 'sn: ext-test-entry'
2. Search '(cn:de:=ext-test-entry)'
3. Search '(sn:caseIgnoreIA5Match:=EXT-TEST-ENTRY)'
4. Search '(sn:caseIgnoreMatch:=EXT-TEST-ENTRY)'
5. Search '(sn:caseExactMatch:=EXT-TEST-ENTRY)'
6. Search '(sn:caseExactMatch:=ext-test-entry)'
7. Search '(sn:caseExactIA5Match:=EXT-TEST-ENTRY)'
8. Search '(sn:caseExactIA5Match:=ext-test-entry)'
:expectedresults:
1. This should pass
2. This should return one entry
3. This should return one entry
4. This should return one entry
5. This should return NO entry
6. This should return one entry
7. This should return NO entry
8. This should return one entry
3. return one entry
"""
log.info('Running test_filter_escaped...')
ATTR_VAL = 'ext-test-entry'
USER1_DN = "uid=%s,%s" % (ATTR_VAL, DEFAULT_SUFFIX)
try:
topology_st.standalone.add_s(Entry((USER1_DN, {'objectclass': "top extensibleObject".split(),
'sn': ATTR_VAL.encode(),
'cn': ATTR_VAL.encode(),
'uid': ATTR_VAL.encode()})))
except ldap.LDAPError as e:
log.fatal('test_extended_search: Failed to add test user ' + USER1_DN + ': error ' +
e.message['desc'])
assert False
# filter: '(cn:de:=ext-test-entry)'
myfilter = '(cn:de:=%s)' % ATTR_VAL
topology_st.standalone.log.info("Try to search with filter %s" % myfilter)
ents = topology_st.standalone.search_s(SUFFIX, ldap.SCOPE_SUBTREE, myfilter)
assert len(ents) == 1
# filter: '(sn:caseIgnoreIA5Match:=EXT-TEST-ENTRY)'
myfilter = '(cn:caseIgnoreIA5Match:=%s)' % ATTR_VAL.upper()
topology_st.standalone.log.info("Try to search with filter %s" % myfilter)
ents = topology_st.standalone.search_s(SUFFIX, ldap.SCOPE_SUBTREE, myfilter)
assert len(ents) == 1
# filter: '(sn:caseIgnoreMatch:=EXT-TEST-ENTRY)'
myfilter = '(cn:caseIgnoreMatch:=%s)' % ATTR_VAL.upper()
topology_st.standalone.log.info("Try to search with filter %s" % myfilter)
ents = topology_st.standalone.search_s(SUFFIX, ldap.SCOPE_SUBTREE, myfilter)
assert len(ents) == 1
# filter: '(sn:caseExactMatch:=EXT-TEST-ENTRY)'
myfilter = '(cn:caseExactMatch:=%s)' % ATTR_VAL.upper()
topology_st.standalone.log.info("Try to search with filter %s" % myfilter)
ents = topology_st.standalone.search_s(SUFFIX, ldap.SCOPE_SUBTREE, myfilter)
assert len(ents) == 0
# filter: '(sn:caseExactMatch:=ext-test-entry)'
myfilter = '(cn:caseExactMatch:=%s)' % ATTR_VAL
topology_st.standalone.log.info("Try to search with filter %s" % myfilter)
ents = topology_st.standalone.search_s(SUFFIX, ldap.SCOPE_SUBTREE, myfilter)
assert len(ents) == 1
# filter: '(sn:caseExactIA5Match:=EXT-TEST-ENTRY)'
myfilter = '(cn:caseExactIA5Match:=%s)' % ATTR_VAL.upper()
topology_st.standalone.log.info("Try to search with filter %s" % myfilter)
ents = topology_st.standalone.search_s(SUFFIX, ldap.SCOPE_SUBTREE, myfilter)
assert len(ents) == 0
# filter: '(sn:caseExactIA5Match:=ext-test-entry)'
myfilter = '(cn:caseExactIA5Match:=%s)' % ATTR_VAL
topology_st.standalone.log.info("Try to search with filter %s" % myfilter)
ents = topology_st.standalone.search_s(SUFFIX, ldap.SCOPE_SUBTREE, myfilter)
assert len(ents) == 1
if __name__ == '__main__':
# Run isolated
......
......@@ -63,7 +63,7 @@ TEST_PARAMS = [(DN_ROOT, False, [
@pytest.fixture(scope="module")
def test_user(topology_st):
def create_user(topology_st):
"""User for binding operation"""
users = UserAccounts(topology_st.standalone, DEFAULT_SUFFIX)
......@@ -84,7 +84,10 @@ def user_aci(topology_st):
under whole suffix
"""
ACI_BODY = ensure_bytes('(targetattr= "objectClass || cn || sn || mail || uid || uidNumber || gidNumber || homeDirectory || creatorsName || createTimestamp || modifyTimestamp || nsUniqueId || parentid || entryid || entrydn || ou || numSubordinates")(version 3.0; acl "Allow read for user"; allow (read,search,compare) userdn = "ldap:///%s";)' % TEST_USER_DN)
ACI_TARGET = '(targetattr= "modifiersName")'
ACI_RULE = ('(version 3.0; acl "Deny modifiersName for user"; deny (read)'
' userdn = "ldap:///%s";)' % TEST_USER_DN)
ACI_BODY = ensure_bytes(ACI_TARGET + ACI_RULE)
topology_st.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_ADD, 'aci', ACI_BODY)])
......@@ -111,7 +114,7 @@ def test_supported_features(topology_st):
@pytest.mark.parametrize('add_attr', ['', '*', 'objectClass'])
@pytest.mark.parametrize('search_suffix,regular_user,oper_attr_list',
TEST_PARAMS)
def test_search_basic(topology_st, test_user, user_aci, add_attr,
def test_search_basic(topology_st, create_user, user_aci, add_attr,
search_suffix, regular_user, oper_attr_list):
"""Verify that you can get all expected operational attributes
by a Search Request [RFC2251] with '+' (ASCII 43) filter.
......@@ -142,27 +145,20 @@ def test_search_basic(topology_st, test_user, user_aci, add_attr,
topology_st.standalone.simple_bind_s(DN_DM, ensure_bytes(PASSWORD))
search_filter = ['+']
expected_attrs = oper_attr_list
if add_attr:
search_filter.append(add_attr)
expected_attrs = sorted(oper_attr_list + ['objectClass'])
else:
expected_attrs = sorted(oper_attr_list)
expected_attrs += ['objectClass']
log.info("suffix: %s filter: %s" % (search_suffix, search_filter))
entries = topology_st.standalone.search_s(search_suffix, ldap.SCOPE_BASE,
'(objectclass=*)',
search_filter)
log.info("results: %s" % entries)
assert len(entries) > 0
found_attrs = sorted(entries[0].data.keys())
found_attrs = entries[0].data.keys()
if add_attr == '*':
# Check that found attrs contain both operational
# and non-operational attributes
assert all(attr in found_attrs
for attr in ['objectClass', expected_attrs[0]])
assert set(expected_attrs) - set(found_attrs) == set()
else:
assert set(expected_attrs).issubset(set(found_attrs))
assert set(expected_attrs) == set(found_attrs)
if __name__ == '__main__':
......
......@@ -48,7 +48,7 @@ IP_ADDRESS = socket.gethostbyname(HOSTNAME)
@pytest.fixture(scope="module")
def test_user(topology_st, request):
def create_user(topology_st, request):
"""User for binding operation"""
log.info('Adding user simplepaged_test')
......@@ -221,7 +221,7 @@ def paged_search(conn, suffix, controls, search_flt, searchreq_attrlist):
@pytest.mark.parametrize("page_size,users_num", [(6, 5), (5, 5), (5, 25)])
def test_search_success(topology_st, test_user, page_size, users_num):
def test_search_success(topology_st, create_user, page_size, users_num):
"""Verify that search with a simple paged results control
returns all entries it should without errors.
......@@ -241,8 +241,8 @@ def test_search_success(topology_st, test_user, page_size, users_num):
search_flt = r'(uid=test*)'
searchreq_attrlist = ['dn', 'sn']
log.info('Set user bind %s ' % test_user)
conn = test_user.bind(TEST_USER_PWD)
log.info('Set user bind %s ' % create_user)
conn = create_user.bind(TEST_USER_PWD)
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
all_results = paged_search(conn, DEFAULT_SUFFIX, [req_ctrl], search_flt, searchreq_attrlist)
......@@ -264,7 +264,7 @@ def test_search_success(topology_st, test_user, page_size, users_num):
ldap.SIZELIMIT_EXCEEDED),
(5, 50, 'cn=config,%s' % DN_LDBM, 'nsslapd-lookthroughlimit', '20',
ldap.ADMINLIMIT_EXCEEDED)])
def test_search_limits_fail(topology_st, test_user, page_size, users_num,
def test_search_limits_fail(topology_st, create_user, page_size, users_num,
suffix, attr_name, attr_value, expected_err):
"""Verify that search with a simple paged results control
throws expected exceptoins when corresponding limits are
......@@ -293,7 +293,7 @@ def test_search_limits_fail(topology_st, test_user, page_size, users_num,
try:
log.info('Set user bind')
conn = test_user.bind(TEST_USER_PWD)
conn = create_user.bind(TEST_USER_PWD)
log.info('Create simple paged results control instance')
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
......@@ -343,7 +343,7 @@ def test_search_limits_fail(topology_st, test_user, page_size, users_num,
change_conf_attr(topology_st, suffix, attr_name, attr_value_bck)
def test_search_sort_success(topology_st, test_user):
def test_search_sort_success(topology_st, create_user):
"""Verify that search with a simple paged results control
and a server side sort control returns all entries
it should without errors.
......@@ -367,7 +367,7 @@ def test_search_sort_success(topology_st, test_user):
searchreq_attrlist = ['dn', 'sn']
try:
conn = test_user.bind(TEST_USER_PWD)
conn = create_user.bind(TEST_USER_PWD)
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
sort_ctrl = SSSRequestControl(True, ['sn'])
......@@ -388,7 +388,7 @@ def test_search_sort_success(topology_st, test_user):
del_users(users_list)
def test_search_abandon(topology_st, test_user):
def test_search_abandon(topology_st, create_user):
"""Verify that search with simple paged results control
can be abandon
......@@ -414,7 +414,7 @@ def test_search_abandon(topology_st, test_user):
try:
log.info('Set user bind')
conn = test_user.bind(TEST_USER_PWD)
conn = create_user.bind(TEST_USER_PWD)
log.info('Create simple paged results control instance')
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
......@@ -433,7 +433,7 @@ def test_search_abandon(topology_st, test_user):
del_users(users_list)
def test_search_with_timelimit(topology_st, test_user):
def test_search_with_timelimit(topology_st, create_user):
"""Verify that after performing multiple simple paged searches
to completion, each with a timelimit, it wouldn't fail, if we sleep
for a time more than the timelimit.
......@@ -463,7 +463,7 @@ def test_search_with_timelimit(topology_st, test_user):
try:
log.info('Set user bind')
conn = test_user.bind(TEST_USER_PWD)
conn = create_user.bind(TEST_USER_PWD)
log.info('Create simple paged results control instance')
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
......@@ -506,7 +506,7 @@ def test_search_with_timelimit(topology_st, test_user):
@pytest.mark.parametrize('aci_subject',
('dns = "{}"'.format(HOSTNAME),
'ip = "{}"'.format(IP_ADDRESS)))
def test_search_dns_ip_aci(topology_st, test_user, aci_subject):
def test_search_dns_ip_aci(topology_st, create_user, aci_subject):
"""Verify that after performing multiple simple paged searches
to completion on the suffix with DNS or IP based ACI
......@@ -549,7 +549,7 @@ def test_search_dns_ip_aci(topology_st, test_user, aci_subject):
ACI_BODY = ensure_bytes(ACI_TARGET + ACI_ALLOW + ACI_SUBJECT)
topology_st.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_REPLACE, 'aci', ACI_BODY)])
log.info('Set user bind')
conn = test_user.bind(TEST_USER_PWD)
conn = create_user.bind(TEST_USER_PWD)
log.info('Create simple paged results control instance')
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
......@@ -572,7 +572,7 @@ def test_search_dns_ip_aci(topology_st, test_user, aci_subject):
del_users(users_list)
def test_search_multiple_paging(topology_st, test_user):
def test_search_multiple_paging(topology_st, create_user):
"""Verify that after performing multiple simple paged searches
on a single connection without a complition, it wouldn't fail.
......@@ -599,7 +599,7 @@ def test_search_multiple_paging(topology_st, test_user):
try:
log.info('Set user bind')
conn = test_user.bind(TEST_USER_PWD)
conn = create_user.bind(TEST_USER_PWD)
log.info('Create simple paged results control instance')
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
......@@ -625,7 +625,7 @@ def test_search_multiple_paging(topology_st, test_user):
@pytest.mark.parametrize("invalid_cookie", [1000, -1])
def test_search_invalid_cookie(topology_st, test_user, invalid_cookie):
def test_search_invalid_cookie(topology_st, create_user, invalid_cookie):
"""Verify that using invalid cookie while performing
search with the simple paged results control throws
a TypeError exception
......@@ -653,7 +653,7 @@ def test_search_invalid_cookie(topology_st, test_user, invalid_cookie):
try:
log.info('Set user bind')
conn = test_user.bind(TEST_USER_PWD)
conn = create_user.bind(TEST_USER_PWD)
log.info('Create simple paged results control instance')
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
......@@ -673,7 +673,7 @@ def test_search_invalid_cookie(topology_st, test_user, invalid_cookie):
del_users(users_list)
def test_search_abandon_with_zero_size(topology_st, test_user):
def test_search_abandon_with_zero_size(topology_st, create_user):
"""Verify that search with simple paged results control
can be abandon using page_size = 0
......@@ -697,7 +697,7 @@ def test_search_abandon_with_zero_size(topology_st, test_user):
try:
log.info('Set user bind')
conn = test_user.bind(TEST_USER_PWD)
conn = create_user.bind(TEST_USER_PWD)
log.info('Create simple paged results control instance')
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
......@@ -716,7 +716,7 @@ def test_search_abandon_with_zero_size(topology_st, test_user):
del_users(users_list)
def test_search_pagedsizelimit_success(topology_st, test_user):
def test_search_pagedsizelimit_success(topology_st, create_user):
"""Verify that search with a simple paged results control
returns all entries it should without errors while
valid value set to nsslapd-pagedsizelimit.
......@@ -746,7 +746,7 @@ def test_search_pagedsizelimit_success(topology_st, test_user):
try:
log.info('Set user bind')
conn = test_user.bind(TEST_USER_PWD)
conn = create_user.bind(TEST_USER_PWD)
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
controls = [req_ctrl]
......@@ -763,7 +763,7 @@ def test_search_pagedsizelimit_success(topology_st, test_user):
@pytest.mark.parametrize('conf_attr,user_attr,expected_rs',
(('5', '15', 'PASS'), ('15', '5', ldap.SIZELIMIT_EXCEEDED)))
def test_search_nspagedsizelimit(topology_st, test_user,
def test_search_nspagedsizelimit(topology_st, create_user,
conf_attr, user_attr, expected_rs):
"""Verify that nsPagedSizeLimit attribute overrides
nsslapd-pagedsizelimit while performing search with
......@@ -804,11 +804,11 @@ def test_search_nspagedsizelimit(topology_st, test_user,
search_flt = r'(uid=test*)'
searchreq_attrlist = ['dn', 'sn']
conf_attr_bck = change_conf_attr(topology_st, DN_CONFIG, 'nsslapd-pagedsizelimit', conf_attr)
user_attr_bck = change_conf_attr(topology_st, test_user.dn, 'nsPagedSizeLimit', user_attr)
user_attr_bck = change_conf_attr(topology_st, create_user.dn, 'nsPagedSizeLimit', user_attr)
try:
log.info('Set user bind')
conn = test_user.bind(TEST_USER_PWD)
conn = create_user.bind(TEST_USER_PWD)
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
controls = [req_ctrl]
......@@ -826,13 +826,13 @@ def test_search_nspagedsizelimit(topology_st, test_user,
finally:
del_users(users_list)
change_conf_attr(topology_st, DN_CONFIG, 'nsslapd-pagedsizelimit', conf_attr_bck)
change_conf_attr(topology_st, test_user.dn, 'nsPagedSizeLimit', user_attr_bck)
change_conf_attr(topology_st, create_user.dn, 'nsPagedSizeLimit', user_attr_bck)
@pytest.mark.parametrize('conf_attr_values,expected_rs',
((('5000', '100', '100'), ldap.ADMINLIMIT_EXCEEDED),
(('5000', '120', '122'), 'PASS')))
def test_search_paged_limits(topology_st, test_user, conf_attr_values, expected_rs):
def test_search_paged_limits(topology_st, create_user, conf_attr_values, expected_rs):
"""Verify that nsslapd-idlistscanlimit and
nsslapd-lookthroughlimit can limit the administrator
search abilities.
......@@ -879,7 +879,7 @@ def test_search_paged_limits(topology_st, test_user, conf_attr_values, expected_
try:
log.info('Set user bind')
conn = test_user.bind(TEST_USER_PWD)
conn = create_user.bind(TEST_USER_PWD)
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
controls = [req_ctrl]
......@@ -904,7 +904,7 @@ def test_search_paged_limits(topology_st, test_user, conf_attr_values, expected_
@pytest.mark.parametrize('conf_attr_values,expected_rs',
((('1000', '100', '100'), ldap.ADMINLIMIT_EXCEEDED),
(('1000', '120', '122'), 'PASS')))
def test_search_paged_user_limits(topology_st, test_user, conf_attr_values, expected_rs):
def test_search_paged_user_limits(topology_st, create_user, conf_attr_values, expected_rs):
"""Verify that nsPagedIDListScanLimit and nsPagedLookthroughLimit
override nsslapd-idlistscanlimit and nsslapd-lookthroughlimit
while performing search with the simple paged results control.
......@@ -947,12 +947,12 @@ def test_search_paged_user_limits(topology_st, test_user, conf_attr_values, expe
searchreq_attrlist = ['dn', 'sn']
lookthrough_attr_bck = change_conf_attr(topology_st, 'cn=config,%s' % DN_LDBM, 'nsslapd-lookthroughlimit', conf_attr_values[0])
idlistscan_attr_bck = change_conf_attr(topology_st, 'cn=config,%s' % DN_LDBM, 'nsslapd-idlistscanlimit', conf_attr_values[0])
user_idlistscan_attr_bck = change_conf_attr(topology_st, test_user.dn, 'nsPagedIDListScanLimit', conf_attr_values[1])
user_lookthrough_attr_bck = change_conf_attr(topology_st, test_user.dn, 'nsPagedLookthroughLimit', conf_attr_values[2])
user_idlistscan_attr_bck = change_conf_attr(topology_st, create_user.dn, 'nsPagedIDListScanLimit', conf_attr_values[1])
user_lookthrough_attr_bck = change_conf_attr(topology_st, create_user.dn, 'nsPagedLookthroughLimit', conf_attr_values[2])
try:
log.info('Set user bind')
conn = test_user.bind(TEST_USER_PWD)
conn = create_user.bind(TEST_USER_PWD)
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
controls = [req_ctrl]
......@@ -970,11 +970,11 @@ def test_search_paged_user_limits(topology_st, test_user, conf_attr_values, expe
del_users(users_list)
change_conf_attr(topology_st, 'cn=config,%s' % DN_LDBM, 'nsslapd-lookthroughlimit', lookthrough_attr_bck)
change_conf_attr(topology_st, 'cn=config,%s' % DN_LDBM, 'nsslapd-idlistscanlimit', idlistscan_attr_bck)
change_conf_attr(topology_st, test_user.dn, 'nsPagedIDListScanLimit', user_idlistscan_attr_bck)
change_conf_attr(topology_st, test_user.dn, 'nsPagedLookthroughLimit', user_lookthrough_attr_bck)
change_conf_attr(topology_st, create_user.dn, 'nsPagedIDListScanLimit', user_idlistscan_attr_bck)
change_conf_attr(topology_st, create_user.dn, 'nsPagedLookthroughLimit', user_lookthrough_attr_bck)
def test_ger_basic(topology_st, test_user):
def test_ger_basic(topology_st, create_user):
"""Verify that search with a simple paged results control
and get effective rights control returns all entries
it should without errors.
......@@ -1011,7 +1011,7 @@ def test_ger_basic(topology_st, test_user):
del_users(users_list)
def test_multi_suffix_search(topology_st, test_user, new_suffixes):
def test_multi_suffix_search(topology_st, create_user, new_suffixes):
"""Verify that page result search returns empty cookie
if there is no returned entry.
......@@ -1069,7 +1069,7 @@ def test_multi_suffix_search(topology_st, test_user, new_suffixes):
@pytest.mark.parametrize('conf_attr_value', (None, '-1', '1000'))
def test_maxsimplepaged_per_conn_success(topology_st, test_user, conf_attr_value):
def test_maxsimplepaged_per_conn_success(topology_st, create_user, conf_attr_value):
"""Verify that nsslapd-maxsimplepaged-per-conn acts according design
:id: 192e2f25-04ee-4ff9-9340-d875dcbe8011
......@@ -1096,7 +1096,7 @@ def test_maxsimplepaged_per_conn_success(topology_st, test_user, conf_attr_value
try:
log.info('Set user bind')
conn = test_user.bind(TEST_USER_PWD)
conn = create_user.bind(TEST_USER_PWD)
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
......@@ -1112,7 +1112,7 @@ def test_maxsimplepaged_per_conn_success(topology_st, test_user, conf_attr_value
@pytest.mark.parametrize('conf_attr_value', ('0', '1'))
def test_maxsimplepaged_per_conn_failure(topology_st, test_user, conf_attr_value):
def test_maxsimplepaged_per_conn_failure(topology_st, create_user, conf_attr_value):
"""Verify that nsslapd-maxsimplepaged-per-conn acts according design
:id: eb609e63-2829-4331-8439-a35f99694efa
......@@ -1140,7 +1140,7 @@ def test_maxsimplepaged_per_conn_failure(topology_st, test_user, conf_attr_value
try:
log.info('Set user bind')
conn = test_user.bind(TEST_USER_PWD)
conn = create_user.bind(TEST_USER_PWD)
log.info('Create simple paged results control instance')
req_ctrl = SimplePagedResultsControl(True, size=page_size, cookie='')
......
......@@ -32,7 +32,7 @@ log = logging.getLogger(__name__)
@pytest.fixture(scope="module")
def test_user(topology_st, request):
def create_user(topology_st, request):
"""User for binding operation"""
log.info('Adding user {}'.format(TEST_USER_DN))
......@@ -60,7 +60,7 @@ def test_user(topology_st, request):
@pytest.fixture(scope="module")
def password_policy(topology_st, test_user):
def password_policy(topology_st, create_user):
"""Set up password policy for subtree and user"""
log.info('Enable fine-grained policy')
......@@ -122,7 +122,7 @@ def password_policy(topology_st, test_user):
[('on', 'off', ldap.UNWILLING_TO_PERFORM),
('off', 'off', ldap.UNWILLING_TO_PERFORM),
('off', 'on', None), ('on', 'on', None)])
def test_change_pwd(topology_st, test_user, password_policy,
def test_change_pwd(topology_st, create_user, password_policy,
subtree_pwchange, user_pwchange, exception):
"""Verify that 'passwordChange' attr works as expected
User should have a priority over a subtree.
......@@ -194,7 +194,7 @@ def test_change_pwd(topology_st, test_user, password_policy,
ensure_bytes(TEST_USER_PWD))])
def test_pwd_min_age(topology_st, test_user, password_policy):
def test_pwd_min_age(topology_st, create_user, password_policy):
"""If we set passwordMinAge to some value, for example to 10, then it
should not allow the user to change the password within 10 seconds after
his previous change.
......
......@@ -37,7 +37,7 @@ TEMP_USER_DN = '%s,%s' % (TEMP_USER, OU_PEOPLE)
@pytest.fixture(scope="module")
def test_user(topology_st, request):
def create_user(topology_st, request):
"""User for binding operation"""
log.info('Adding user {}'.format(BN))
......@@ -71,7 +71,7 @@ def test_user(topology_st, request):
@pytest.fixture(scope="module")
def password_policy(topology_st, test_user):
def password_policy(topology_st, create_user):
"""Set global password policy.
Then, set fine-grained subtree level password policy
to ou=People with no password syntax.
......@@ -140,7 +140,7 @@ def check_attr_val(topology_st, dn, attr, expected):
@pytest.mark.parametrize('inherit_value,checksyntax_value',
[('off', 'off'), ('on', 'off'), ('off', 'on')])
def test_entry_has_no_restrictions(topology_st, password_policy, test_user,
def test_entry_has_no_restrictions(topology_st, password_policy, create_user,
inherit_value, checksyntax_value):
"""Make sure an entry added to ou=people has no password syntax restrictions
......@@ -201,7 +201,7 @@ def test_entry_has_no_restrictions(topology_st, password_policy, test_user,
@pytest.mark.parametrize('container', [DN_CONFIG, PWP_CONTAINER_PEOPLE])
def test_entry_has_restrictions(topology_st, password_policy, test_user, container):
def test_entry_has_restrictions(topology_st, password_policy, create_user, container):
"""Set 'nsslapd-pwpolicy-inherit-global: on' and 'passwordCheckSyntax: on'.
Make sure that syntax rules work, if set them at both: cn=config and
ou=people policy container.
......
......@@ -34,7 +34,7 @@ def password_policy(topology_st):
@pytest.fixture(scope="module")
def test_user(topology_st):
def create_user(topology_st):
"""Create the test user."""
topology_st.standalone.add_s(Entry((
......@@ -132,7 +132,7 @@ def tryPassword(inst, policy_attr, value, reset_value, pw_bad, pw_good, msg):
setPolicy(inst, policy_attr, reset_value)
def test_basic(topology_st, test_user, password_policy):
def test_basic(topology_st, create_user, password_policy):
"""Ensure that on a password change, the policy syntax
is enforced correctly.
......@@ -246,7 +246,7 @@ def test_basic(topology_st, test_user, password_policy):
'password123', 'does not contain minimum number of alphas')
# Max Repeats
tryPassword(topology_st.standalone, 'passwordMaxRepeats', 2, 0, 'passsword',
'pasword123', 'too many repeating characters')
'password123', 'too many repeating characters')
# Min Specials
tryPassword(topology_st.standalone, 'passwordMinSpecials', 2, 0, 'passwd',
'password_#$',
......
......@@ -68,7 +68,7 @@ def passw_policy(topo, request):
@pytest.fixture(scope="module")
def test_user(topo, request):
def create_user(topo, request):
"""Add test users using UserAccounts"""
log.info('Adding user-uid={},ou=people,{}'.format(user_data['uid'], SUFFIX))
......@@ -89,7 +89,7 @@ def test_user(topo, request):
return tuser
def test_pwp_local_unlock(topo, passw_policy, test_user):
def test_pwp_local_unlock(topo, passw_policy, create_user):
"""Test subtree policies use the same global default for passwordUnlock
:id: 741a8417-5f65-4012-b9ed-87987ce3ca1b
......@@ -105,12 +105,12 @@ def test_pwp_local_unlock(topo, passw_policy, test_user):
"""
log.info("Verify user can bind...")
test_user.bind(PASSWORD)
create_user.bind(PASSWORD)
log.info('Test passwordUnlock default - user should be able to reset password after lockout')
for i in range(0, 2):
try:
test_user.bind("bad-password")
create_user.bind("bad-password")
except ldap.INVALID_CREDENTIALS:
# expected
pass
......@@ -120,18 +120,18 @@ def test_pwp_local_unlock(topo, passw_policy, test_user):
log.info('Verify account is locked')
with pytest.raises(ldap.CONSTRAINT_VIOLATION):
test_user.bind(PASSWORD)
create_user.bind(PASSWORD)
log.info('Wait for lockout duration...')
time.sleep(4)
log.info('Check if user can now bind with correct password')
test_user.bind(PASSWORD)
create_user.bind(PASSWORD)
@pytest.mark.bz1465600
@pytest.mark.parametrize("user_pasw", TEST_PASSWORDS)
def test_trivial_passw_check(topo, passw_policy, test_user, user_pasw):
def test_trivial_passw_check(topo, passw_policy, create_user, user_pasw):
"""PasswordCheckSyntax attribute fails to validate cn, sn, uid, givenname, ou and mail attributes
:id: bf9fe1ef-56cb-46a3-a6f8-5530398a06dc
......@@ -148,20 +148,20 @@ def test_trivial_passw_check(topo, passw_policy, test_user, user_pasw):
4. Resetting userPassword to cn, sn, uid and mail should be rejected.
"""
conn = test_user.bind(PASSWORD)
conn = create_user.bind(PASSWORD)
try:
log.info('Replace userPassword attribute with {}'.format(user_pasw))
with pytest.raises(ldap.CONSTRAINT_VIOLATION) as excinfo:
conn.modify_s(test_user.dn, [(ldap.MOD_REPLACE, 'userPassword', ensure_bytes(user_pasw))])
conn.modify_s(create_user.dn, [(ldap.MOD_REPLACE, 'userPassword', ensure_bytes(user_pasw))])
log.fatal('Failed: Userpassword with {} is accepted'.format(user_pasw))
assert 'password based off of user entry' in str(excinfo.value)
finally:
conn.unbind_s()
test_user.set('userPassword', PASSWORD)
create_user.set('userPassword', PASSWORD)
@pytest.mark.parametrize("user_pasw", TEST_PASSWORDS)
def test_global_vs_local(topo, passw_policy, test_user, user_pasw):
def test_global_vs_local(topo, passw_policy, create_user, user_pasw):
"""Passwords rejected if its similar to uid, cn, sn, givenname, ou and mail attributes
:id: dfd6cf5d-8bcd-4895-a691-a43ad9ec1be8
......@@ -179,17 +179,17 @@ def test_global_vs_local(topo, passw_policy, test_user, user_pasw):
log.info('Configure Pwpolicy with PasswordCheckSyntax and nsslapd-pwpolicy-local set to off')
topo.standalone.config.set('nsslapd-pwpolicy-local', 'off')
conn = test_user.bind(PASSWORD)
conn = create_user.bind(PASSWORD)
log.info('Replace userPassword attribute with {}'.format(user_pasw))
try:
try:
conn.modify_s(test_user.dn, [(ldap.MOD_REPLACE, 'userPassword', ensure_bytes(user_pasw))])
conn.modify_s(create_user.dn, [(ldap.MOD_REPLACE, 'userPassword', ensure_bytes(user_pasw))])
except ldap.LDAPError as e:
log.fatal('Failed to replace userPassword: error {}'.format(e.message['desc']))
raise e
finally:
conn.unbind_s()
test_user.set('userPassword', PASSWORD)
create_user.set('userPassword', PASSWORD)
if __name__ == '__main__':
......
......@@ -31,7 +31,7 @@ log = logging.getLogger(__name__)
@pytest.fixture(scope="function")
def test_entry(topo_m4, request):
def create_entry(topo_m4, request):
"""Add test entry to master1"""
log.info('Adding entry {}'.format(TEST_ENTRY_DN))
......@@ -79,7 +79,7 @@ def new_suffix(topo_m4, request):
request.addfinalizer(fin)
def test_add_entry(topo_m4, test_entry):
def test_add_entry(topo_m4, create_entry):
"""Check that entries are replicated after add operation
:id: 024250f1-5f7e-4f3b-a9f5-27741e6fd405
......@@ -94,7 +94,7 @@ def test_add_entry(topo_m4, test_entry):
assert all(entries), "Entry {} wasn't replicated successfully".format(TEST_ENTRY_DN)
def test_modify_entry(topo_m4, test_entry):
def test_modify_entry(topo_m4, create_entry):
"""Check that entries are replicated after modify operation
:id: 36764053-622c-43c2-a132-d7a3ab7d9aaa
......@@ -148,7 +148,7 @@ def test_modify_entry(topo_m4, test_entry):
assert "{}@greenhat.com".format(TEST_ENTRY_NAME) not in u.get_attr_vals_utf8('mail')
def test_delete_entry(topo_m4, test_entry):
def test_delete_entry(topo_m4, create_entry):
"""Check that entry deletion is replicated after delete operation
:id: 18437262-9d6a-4b98-a47a-6182501ab9bc
......@@ -169,7 +169,7 @@ def test_delete_entry(topo_m4, test_entry):
@pytest.mark.parametrize("delold", [0, 1])
def test_modrdn_entry(topo_m4, test_entry, delold):
def test_modrdn_entry(topo_m4, create_entry, delold):
"""Check that entries are replicated after modrdn operation
:id: 02558e6d-a745-45ae-8d88-34fe9b16adc9
......@@ -324,7 +324,7 @@ def test_new_suffix(topo_m4, new_suffix):
repl.remove_master(m1)
repl.remove_master(m2)
def test_many_attrs(topo_m4, test_entry):
def test_many_attrs(topo_m4, create_entry):
"""Check a replication with many attributes (add and delete)
:id: d540b358-f67a-43c6-8df5-7c74b3cb7523
......@@ -366,7 +366,7 @@ def test_many_attrs(topo_m4, test_entry):
assert value not in delete_list
def test_double_delete(topo_m4, test_entry):
def test_double_delete(topo_m4, create_entry):
"""Check that double delete of the entry doesn't crash server
:ID: 3496c82d-636a-48c9-973c-2455b12164cc
......@@ -392,7 +392,7 @@ def test_double_delete(topo_m4, test_entry):
assert not entries, "Entry deletion {} wasn't replicated successfully".format(TEST_ENTRY_DN)
def test_password_repl_error(topo_m4, test_entry):
def test_password_repl_error(topo_m4, create_entry):
"""Check that error about userpassword replication is properly logged
:ID: 714130ff-e4f0-4633-9def-c1f4b24abfef
......
......@@ -114,7 +114,7 @@ def _test_base(topology):
M1 = topology.ms["master1"]
conts = nsContainers(M1, SUFFIX)
test_base = conts.create(properties={'cn': 'test_container'})
base_m2 = conts.create(properties={'cn': 'test_container'})
for inst in topology:
inst.config.loglevel([ErrorLog.DEFAULT, ErrorLog.REPLICA], service='error')
......@@ -123,13 +123,13 @@ def _test_base(topology):
inst.config.enable_log('audit')
inst.restart()
return test_base
return base_m2
def _delete_test_base(inst, test_base_dn):
def _delete_test_base(inst, base_m2_dn):
"""Delete test container with entries and entry conflicts"""
ents = inst.search_s(test_base_dn, ldap.SCOPE_SUBTREE, filterstr="(|(objectclass=*)(objectclass=ldapsubentry))")
ents = inst.search_s(base_m2_dn, ldap.SCOPE_SUBTREE, filterstr="(|(objectclass=*)(objectclass=ldapsubentry))")
for ent in sorted(ents, key=lambda e: len(e.dn), reverse=True):
log.debug("Delete entry children {}".format(ent.dn))
......@@ -140,7 +140,7 @@ def _delete_test_base(inst, test_base_dn):
@pytest.fixture
def test_base(topology_m2, request):
def base_m2(topology_m2, request):
tb = _test_base(topology_m2)
def fin():
......@@ -152,7 +152,7 @@ def test_base(topology_m2, request):
@pytest.fixture
def test_base_m3(topology_m3, request):
def base_m3(topology_m3, request):
tb = _test_base(topology_m3)
def fin():
......@@ -164,7 +164,7 @@ def test_base_m3(topology_m3, request):
class TestTwoMasters:
def test_add_modrdn(self, topology_m2, test_base):
def test_add_modrdn(self, topology_m2, base_m2):
"""Check that conflict properly resolved for create - modrdn operations
:id: 77f09b18-03d1-45da-940b-1ad2c2908ebb
......@@ -194,8 +194,8 @@ class TestTwoMasters:
M1 = topology_m2.ms["master1"]
M2 = topology_m2.ms["master2"]
test_users_m1 = UserAccounts(M1, test_base.dn, rdn=None)
test_users_m2 = UserAccounts(M2, test_base.dn, rdn=None)
test_users_m1 = UserAccounts(M1, base_m2.dn, rdn=None)
test_users_m2 = UserAccounts(M2, base_m2.dn, rdn=None)
repl = ReplicationManager(SUFFIX)
for user_num in range(1000, 1005):
......@@ -233,7 +233,7 @@ class TestTwoMasters:
user_dns_m2 = [user.dn for user in test_users_m2.list()]
assert set(user_dns_m1) == set(user_dns_m2)
def test_complex_add_modify_modrdn_delete(self, topology_m2, test_base):
def test_complex_add_modify_modrdn_delete(self, topology_m2, base_m2):
"""Check that conflict properly resolved for complex operations
which involve add, modify, modrdn and delete
......@@ -269,8 +269,8 @@ class TestTwoMasters:
M1 = topology_m2.ms["master1"]
M2 = topology_m2.ms["master2"]
test_users_m1 = UserAccounts(M1, test_base.dn, rdn=None)
test_users_m2 = UserAccounts(M2, test_base.dn, rdn=None)
test_users_m1 = UserAccounts(M1, base_m2.dn, rdn=None)
test_users_m2 = UserAccounts(M2, base_m2.dn, rdn=None)
repl = ReplicationManager(SUFFIX)
for user_num in range(1100, 1110):
......@@ -365,7 +365,7 @@ class TestTwoMasters:
user_dns_m2 = [user.dn for user in test_users_m2.list()]
assert set(user_dns_m1) == set(user_dns_m2)
def test_memberof_groups(self, topology_m2, test_base):
def test_memberof_groups(self, topology_m2, base_m2):
"""Check that conflict properly resolved for operations
with memberOf and groups
......@@ -400,9 +400,9 @@ class TestTwoMasters:
M1 = topology_m2.ms["master1"]
M2 = topology_m2.ms["master2"]
test_users_m1 = UserAccounts(M1, test_base.dn, rdn=None)
test_groups_m1 = Groups(M1, test_base.dn, rdn=None)
test_groups_m2 = Groups(M2, test_base.dn, rdn=None)
test_users_m1 = UserAccounts(M1, base_m2.dn, rdn=None)
test_groups_m1 = Groups(M1, base_m2.dn, rdn=None)
test_groups_m2 = Groups(M2, base_m2.dn, rdn=None)
repl = ReplicationManager(SUFFIX)
......@@ -539,7 +539,7 @@ class TestTwoMasters:
user_dns_m2 = [user.dn for user in test_users_m2.list()]
assert set(user_dns_m1) == set(user_dns_m2)
def test_nested_entries_with_children(self, topology_m2, test_base):
def test_nested_entries_with_children(self, topology_m2, base_m2):
"""Check that conflict properly resolved for operations
with nested entries with children
......@@ -583,14 +583,14 @@ class TestTwoMasters:
M1 = topology_m2.ms["master1"]
M2 = topology_m2.ms["master2"]
repl = ReplicationManager(SUFFIX)
test_users_m1 = UserAccounts(M1, test_base.dn, rdn=None)
test_users_m2 = UserAccounts(M2, test_base.dn, rdn=None)
test_users_m1 = UserAccounts(M1, base_m2.dn, rdn=None)
test_users_m2 = UserAccounts(M2, base_m2.dn, rdn=None)
_create_user(test_users_m1, 4000)
_create_user(test_users_m1, 4001)
cont_list = []
for num in range(15):
cont = _create_container(M1, test_base.dn, 'sub{}'.format(num))
cont = _create_container(M1, base_m2.dn, 'sub{}'.format(num))
cont_list.append(cont)
repl.test_replication(M1, M2)
......@@ -598,20 +598,20 @@ class TestTwoMasters:
topology_m2.pause_all_replicas()
log.info("Create parent-child on master2 and master1")
_create_container(M2, test_base.dn, 'p0', sleep=True)
cont_p = _create_container(M1, test_base.dn, 'p0', sleep=True)
_create_container(M2, base_m2.dn, 'p0', sleep=True)
cont_p = _create_container(M1, base_m2.dn, 'p0', sleep=True)
_create_container(M1, cont_p.dn, 'c0', sleep=True)
_create_container(M2, cont_p.dn, 'c0', sleep=True)
log.info("Create parent-child on master1 and master2")
cont_p = _create_container(M1, test_base.dn, 'p1', sleep=True)
_create_container(M2, test_base.dn, 'p1', sleep=True)
cont_p = _create_container(M1, base_m2.dn, 'p1', sleep=True)
_create_container(M2, base_m2.dn, 'p1', sleep=True)
_create_container(M1, cont_p.dn, 'c1', sleep=True)
_create_container(M2, cont_p.dn, 'c1', sleep=True)
log.info("Create parent-child on master1 and master2 different child rdn")
cont_p = _create_container(M1, test_base.dn, 'p2', sleep=True)
_create_container(M2, test_base.dn, 'p2', sleep=True)
cont_p = _create_container(M1, base_m2.dn, 'p2', sleep=True)
_create_container(M2, base_m2.dn, 'p2', sleep=True)
_create_container(M1, cont_p.dn, 'c2', sleep=True)
_create_container(M2, cont_p.dn, 'c3', sleep=True)
......@@ -746,7 +746,7 @@ class TestTwoMasters:
for num in range(1, 3):
inst = topology_m2.ms["master{}".format(num)]
conts_dns[inst.serverid] = []
conts = nsContainers(inst, test_base.dn)
conts = nsContainers(inst, base_m2.dn)
for cont in conts.list():
conts_p = nsContainers(inst, cont.dn)
for cont_p in conts_p.list():
......@@ -763,7 +763,7 @@ class TestTwoMasters:
class TestThreeMasters:
def test_nested_entries(self, topology_m3, test_base_m3):
def test_nested_entries(self, topology_m3, base_m3):
"""Check that conflict properly resolved for operations
with nested entries with children
......@@ -800,7 +800,7 @@ class TestThreeMasters:
cont_list = []
for num in range(11):
cont = _create_container(M1, test_base_m3.dn, 'sub{}'.format(num))
cont = _create_container(M1, base_m3.dn, 'sub{}'.format(num))
cont_list.append(cont)
repl.test_replication(M1, M2)
......@@ -858,7 +858,7 @@ class TestThreeMasters:
for num in range(1, 4):
inst = topology_m3.ms["master{}".format(num)]
conts_dns[inst.serverid] = []
conts = nsContainers(inst, test_base_m3.dn)
conts = nsContainers(inst, base_m3.dn)
for cont in conts.list():
conts_p = nsContainers(inst, cont.dn)
for cont_p in conts_p.list():
......@@ -876,5 +876,3 @@ if __name__ == '__main__':
# -s for DEBUG mode
CURRENT_FILE = os.path.realpath(__file__)
pytest.main("-s %s" % CURRENT_FILE)
......@@ -72,7 +72,7 @@ def pattern_errorlog(file, log_pattern, start_location=0):
return count
@pytest.fixture()
def test_entry(topo_m2, request):
def create_entry(topo_m2, request):
"""Add test entry using UserAccounts"""
log.info('Adding a test entry user')
......@@ -81,7 +81,7 @@ def test_entry(topo_m2, request):
return tuser
def test_double_delete(topo_m2, test_entry):
def test_double_delete(topo_m2, create_entry):
"""Check that double delete of the entry doesn't crash server
:id: 3496c82d-636a-48c9-973c-2455b12164cc
......@@ -103,11 +103,11 @@ def test_double_delete(topo_m2, test_entry):
repl.disable_to_master(m1, [m2])
repl.disable_to_master(m2, [m1])
log.info('Deleting entry {} from master1'.format(test_entry.dn))
topo_m2.ms["master1"].delete_s(test_entry.dn)
log.info('Deleting entry {} from master1'.format(create_entry.dn))
topo_m2.ms["master1"].delete_s(create_entry.dn)
log.info('Deleting entry {} from master2'.format(test_entry.dn))
topo_m2.ms["master2"].delete_s(test_entry.dn)
log.info('Deleting entry {} from master2'.format(create_entry.dn))
topo_m2.ms["master2"].delete_s(create_entry.dn)
repl.enable_to_master(m2, [m1])
repl.enable_to_master(m1, [m2])
......@@ -201,7 +201,7 @@ def test_repl_modrdn(topo_m2):
def test_password_repl_error(topo_m2, test_entry):
def test_password_repl_error(topo_m2, create_entry):
"""Check that error about userpassword replication is properly logged
:id: 714130ff-e4f0-4633-9def-c1f4b24abfef
......@@ -226,9 +226,9 @@ def test_password_repl_error(topo_m2, test_entry):
log.info('Set replication loglevel')
m2.config.loglevel((ErrorLog.REPLICA,))
log.info('Modifying entry {} - change userpassword on master 1'.format(test_entry.dn))
log.info('Modifying entry {} - change userpassword on master 1'.format(create_entry.dn))
test_entry.set('userpassword', TEST_ENTRY_NEW_PASS)
create_entry.set('userpassword', TEST_ENTRY_NEW_PASS)
repl = ReplicationManager(DEFAULT_SUFFIX)
repl.wait_for_replication(m1, m2)
......@@ -239,11 +239,11 @@ def test_password_repl_error(topo_m2, test_entry):
try:
log.info('Check that password works on master 2')
test_entry_m2 = UserAccount(m2, test_entry.dn)
test_entry_m2.bind(TEST_ENTRY_NEW_PASS)
create_entry_m2 = UserAccount(m2, create_entry.dn)
create_entry_m2.bind(TEST_ENTRY_NEW_PASS)
log.info('Check the error log for the error with {}'.format(test_entry.dn))
assert not m2.ds_error_log.match('.*can.t add a change for {}.*'.format(test_entry.dn))
log.info('Check the error log for the error with {}'.format(create_entry.dn))
assert not m2.ds_error_log.match('.*can.t add a change for {}.*'.format(create_entry.dn))
finally:
log.info('Set the default loglevel')
m2.config.loglevel((ErrorLog.DEFAULT,))
......@@ -340,7 +340,7 @@ def test_fetch_bindDnGroup(topo_m2):
users = UserAccounts(M1, PEOPLE, rdn=None)
user_props = TEST_USER_PROPERTIES.copy()
user_props.update({'uid': uid, 'cn': uid, 'sn': '_%s' % uid, 'userpassword': PASSWD.encode(), 'description': b'value creation'})
test_user = users.create(properties=user_props)
create_user = users.create(properties=user_props)
groups_M1 = Groups(M1, DEFAULT_SUFFIX)
group_properties = {
......@@ -348,7 +348,7 @@ def test_fetch_bindDnGroup(topo_m2):
'description' : 'testgroup'}
group_M1 = groups_M1.create(properties=group_properties)
group_M2 = Group(M2, group_M1.dn)
assert(not group_M1.is_member(test_user.dn))
assert(not group_M1.is_member(create_user.dn))
......@@ -377,7 +377,7 @@ def test_fetch_bindDnGroup(topo_m2):
for inst in (M1, M2):
agmts = Agreements(inst)
agmt = agmts.list()[0]
agmt.replace('nsDS5ReplicaBindDN', test_user.dn.encode())
agmt.replace('nsDS5ReplicaBindDN', create_user.dn.encode())
agmt.replace('nsds5ReplicaCredentials', PASSWD.encode())
......@@ -393,18 +393,18 @@ def test_fetch_bindDnGroup(topo_m2):
# Replication being broken here we need to directly do the same update.
# Sorry not found another solution except total update
group_M1.add_member(test_user.dn)
group_M2.add_member(test_user.dn)
group_M1.add_member(create_user.dn)
group_M2.add_member(create_user.dn)
topo_m2.resume_all_replicas()
# trigger updates to be sure to have a replication session, giving some time
M1.modify_s(test_user.dn,[(ldap.MOD_ADD, 'description', b'value_1_1')])
M2.modify_s(test_user.dn,[(ldap.MOD_ADD, 'description', b'value_2_2')])
M1.modify_s(create_user.dn,[(ldap.MOD_ADD, 'description', b'value_1_1')])
M2.modify_s(create_user.dn,[(ldap.MOD_ADD, 'description', b'value_2_2')])
time.sleep(10)
# Check replication is working
ents = M1.search_s(test_user.dn, ldap.SCOPE_BASE, '(objectclass=*)')
ents = M1.search_s(create_user.dn, ldap.SCOPE_BASE, '(objectclass=*)')
for ent in ents:
assert (ent.hasAttr('description'))
found = 0
......@@ -415,7 +415,7 @@ def test_fetch_bindDnGroup(topo_m2):
found = found + 1
assert (found == 2)
ents = M2.search_s(test_user.dn, ldap.SCOPE_BASE, '(objectclass=*)')
ents = M2.search_s(create_user.dn, ldap.SCOPE_BASE, '(objectclass=*)')
for ent in ents:
assert (ent.hasAttr('description'))
found = 0
......@@ -559,4 +559,3 @@ if __name__ == '__main__':
# -s for DEBUG mode
CURRENT_FILE = os.path.realpath(__file__)
pytest.main("-s %s" % CURRENT_FILE)
......@@ -17,7 +17,7 @@ TEST_USER_PWD = 'simplepaged_test'
@pytest.fixture(scope="module")
def test_user(topology_st):
def create_user(topology_st):
"""User for binding operation"""
try:
......@@ -153,7 +153,7 @@ def paged_search(topology_st, controls, search_flt, searchreq_attrlist):
return all_results
def test_ticket48808(topology_st, test_user):
def test_ticket48808(topology_st, create_user):
log.info('Run multiple paging controls on a single connection')
users_num = 100
page_size = 30
......
......@@ -360,7 +360,7 @@ sub add {
return $self->write();
}
if (exists($self->{$ndn})) {
if ($ndn && exists($self->{$ndn})) {
$self->setErrorCode(LDAP_ALREADY_EXISTS);
return 0;
}
......
......@@ -16,22 +16,9 @@
import itertools
import re
from enum import IntEnum
import gdb
from gdb.FrameDecorator import FrameDecorator
class LDAPFilter(IntEnum):
PRESENT = 0x87
APPROX = 0xa8
LE = 0xa6
GE = 0xa5
SUBSTRINGS = 0xa4
EQUALITY = 0xa3
NOT = 0xa2
OR = 0xa1
AND = 0xa0
class DSAccessLog (gdb.Command):
"""Display the Directory Server access log."""
def __init__ (self):
......@@ -127,64 +114,7 @@ class DSIdleFilter():
frame_iter = map(DSIdleFilterDecorator, frame_iter)
return frame_iter
class DSFilterPrint (gdb.Command):
"""Display a filter's contents"""
def __init__ (self):
super (DSFilterPrint, self).__init__ ("ds-filter-print", gdb.COMMAND_DATA)
def display_filter(self, filter_element, depth=0):
pad = " " * depth
# Extract the choice, that determines what we access next.
f_choice = filter_element['f_choice']
f_un = filter_element['f_un']
f_flags = filter_element['f_flags']
if f_choice == LDAPFilter.PRESENT:
print("%s(%s=*) flags:%s" % (pad, f_un['f_un_type'], f_flags))
elif f_choice == LDAPFilter.APPROX:
print("%sAPPROX ???" % pad)
elif f_choice == LDAPFilter.LE:
print("%sLE ???" % pad)
elif f_choice == LDAPFilter.GE:
print("%sGE ???" % pad)
elif f_choice == LDAPFilter.SUBSTRINGS:
f_un_sub = f_un['f_un_sub']
value = f_un_sub['sf_initial']
print("%s(%s=%s*) flags:%s" % (pad, f_un_sub['sf_type'], value, f_flags))
elif f_choice == LDAPFilter.EQUALITY:
f_un_ava = f_un['f_un_ava']
value = f_un_ava['ava_value']['bv_val']
print("%s(%s=%s) flags:%s" % (pad, f_un_ava['ava_type'], value, f_flags))
elif f_choice == LDAPFilter.NOT:
print("%sNOT ???" % pad)
elif f_choice == LDAPFilter.OR:
print("%s(| flags:%s" % (pad, f_flags))
filter_child = f_un['f_un_complex'].dereference()
self.display_filter(filter_child, depth + 4)
print("%s)" % pad)
elif f_choice == LDAPFilter.AND:
# Our child filter is in f_un_complex.
print("%s(& flags:%s" % (pad, f_flags))
filter_child = f_un['f_un_complex'].dereference()
self.display_filter(filter_child, depth + 4)
print("%s)" % pad)
else:
print("Corrupted filter, no such value %s" % f_choice)
f_next = filter_element['f_next']
if f_next != 0:
self.display_filter(f_next.dereference(), depth)
def invoke (self, arg, from_tty):
# Select our program state
gdb.newest_frame()
cur_frame = gdb.selected_frame()
# We are given the name of a filter, so we need to look up that symbol.
filter_val = cur_frame.read_var(arg)
filter_root = filter_val.dereference()
self.display_filter(filter_root)
DSAccessLog()
DSBacktrace()
DSIdleFilter()
DSFilterPrint()
......@@ -531,10 +531,8 @@ or_filter_create(Slapi_PBlock *pb)
default:
break;
}
for (; len > 0 && *val != ' '; ++val, --len)
for (; len > 0 && *val == ' '; ++val, --len)
;
if (len > 0)
++val, --len; /* skip the space */
bv.bv_len = len;
bv.bv_val = (len > 0) ? val : NULL;
} else { /* mrOID does not identify an ordering rule. */
......
......@@ -1852,7 +1852,7 @@ _cl5AppInit(void)
{
int rc = -1; /* initialize to failure */
DB_ENV *dbEnv = NULL;
size_t pagesize = 0;
uint32_t pagesize = 0;
int openflags = 0;
char *cookie = NULL;
Slapi_Backend *be = slapi_get_first_backend(&cookie);
......