Skip to content
Commits on Source (684)
This diff is collapsed.
=======================================================================
389 Directory Server
=======================================================================
The 389 Directory Server is subject to the terms detailed in the
license agreement file called LICENSE.
Late-breaking news and information on the 389 Directory Server is
available on our wiki page:
http://www.port389.org/
389 Directory Server
====================
389 Directory Server is a highly usable, fully featured, reliable
and secure LDAP server implementation. It handles many of the
largest LDAP deployments in the world.
All our code has been extensively tested with sanitisation tools.
As well as a rich feature set of fail-over and backup technologies
gives administrators confidence their accounts are safe.
License
-------
The 389 Directory Server is subject to the terms detailed in the
license agreement file called LICENSE.
Late-breaking news and information on the 389 Directory Server is
available on our wiki page:
http://www.port389.org/
Building
--------
autoreconf -fiv
./configure --enable-debug --with-openldap --enable-cmocka --enable-asan
make
make lib389
make check
sudo make install
sudo make lib389-install
Testing
-------
sudo py.test -s 389-ds-base/dirsrvtests/tests/suites/basic/
More information
----------------
Please see our contributing guide online:
http://www.port389.org/docs/389ds/contributing.html
......@@ -9,8 +9,8 @@ vendor="389 Project"
# PACKAGE_VERSION is constructed from these
VERSION_MAJOR=1
VERSION_MINOR=3
VERSION_MAINT=7.4
VERSION_MINOR=4
VERSION_MAINT=0.15
# NOTE: VERSION_PREREL is automatically set for builds made out of a git tree
VERSION_PREREL=
VERSION_DATE=$(date -u +%Y%m%d)
......
......@@ -24,8 +24,12 @@ AC_SUBST([CONSOLE_VERSION])
AM_MAINTAINER_MODE
AC_CANONICAL_HOST
AC_CONFIG_MACRO_DIRS([m4])
# Checks for programs.
: ${CXXFLAGS=""}
AC_PROG_CXX
: ${CFLAGS=""}
AC_PROG_CC
AM_PROG_CC_C_O
AM_PROG_AS
......@@ -43,6 +47,7 @@ AC_CHECK_HEADERS([arpa/inet.h errno.h fcntl.h malloc.h netdb.h netinet/in.h stdl
# These are *required* headers without option.
AC_CHECK_HEADERS([inttypes.h], [], AC_MSG_ERROR([unable to locate required header inttypes.h]))
AC_CHECK_HEADERS([crack.h], [], AC_MSG_ERROR([unable to locate required header crack.h]))
# Checks for typedefs, structures, and compiler characteristics.
......@@ -78,31 +83,130 @@ AC_CHECK_FUNCS([clock_gettime], [], AC_MSG_ERROR([unable to locate required symb
# This will detect if we need to add the LIBADD_DL value for us.
LT_LIB_DLLOAD
# Optional rust component support.
AC_MSG_CHECKING(for --enable-rust)
AC_ARG_ENABLE(rust, AS_HELP_STRING([--enable-rust], [Enable rust language features (default: no)]),
[
AC_CHECK_PROG(CARGO, [cargo], [yes], [no])
AC_CHECK_PROG(RUSTC, [rustc], [yes], [no])
AS_IF([test "$CARGO" != "yes" -o "$RUSTC" != "yes"], [
AC_MSG_FAILURE("Rust based plugins cannot be built cargo=$CARGO rustc=$RUSTC")
])
with_rust=yes
AC_MSG_RESULT(yes)
],
[
AC_MSG_RESULT(no)
])
AM_CONDITIONAL([RUST_ENABLE],[test -n "$with_rust"])
AC_MSG_CHECKING(for --enable-debug)
AC_ARG_ENABLE(debug, AS_HELP_STRING([--enable-debug], [Enable debug features (default: no)]),
[
AC_MSG_RESULT(yes)
debug_defs="-g3 -DDEBUG -DMCC_DEBUG -O0"
debug_defs="-DDEBUG -DMCC_DEBUG"
debug_cflags="-g3 -O0"
debug_cxxflags="-g3 -O0"
debug_rust_defs="-C debuginfo=2"
cargo_defs=""
rust_target_dir="debug"
with_debug=yes
],
[
AC_MSG_RESULT(no)
debug_defs=""
# set the default safe CFLAGS that would be set by AC_PROG_CC otherwise
debug_cflags="-g -O2"
debug_cxxflags="-g -O2"
debug_rust_defs="-C debuginfo=2"
cargo_defs="--release"
rust_target_dir="release"
])
AC_SUBST([debug_defs])
AC_SUBST([debug_cflags])
AC_SUBST([debug_cxxflags])
AC_SUBST([debug_rust_defs])
AC_SUBST([cargo_defs])
AC_SUBST([rust_target_dir])
AM_CONDITIONAL([DEBUG],[test -n "$with_debug"])
AC_MSG_CHECKING(for --enable-asan)
AC_ARG_ENABLE(asan, AS_HELP_STRING([--enable-asan], [Enable gcc address sanitizer options (default: no)]),
AC_ARG_ENABLE(asan, AS_HELP_STRING([--enable-asan], [Enable gcc/clang address sanitizer options (default: no)]),
[
AC_MSG_RESULT(yes)
asan_defs="-fsanitize=address -fno-omit-frame-pointer"
asan_cflags="-fsanitize=address -fno-omit-frame-pointer"
asan_rust_defs="-Z sanitizer=address"
],
[
AC_MSG_RESULT(no)
asan_defs=""
asan_cflags=""
asan_rust_defs=""
])
AC_SUBST([asan_defs])
AC_SUBST([asan_cflags])
AC_SUBST([asan_rust_defs])
AM_CONDITIONAL(enable_asan,test "$enable_asan" = "yes")
AC_MSG_CHECKING(for --enable-msan)
AC_ARG_ENABLE(msan, AS_HELP_STRING([--enable-msan], [Enable gcc/clang memory sanitizer options (default: no)]),
[
AC_MSG_RESULT(yes)
msan_cflags="-fsanitize=memory -fsanitize-memory-track-origins -fno-omit-frame-pointer"
msan_rust_defs="-Z sanitizer=memory"
],
[
AC_MSG_RESULT(no)
msan_cflags=""
msan_rust_defs=""
])
AC_SUBST([msan_cflags])
AC_SUBST([msan_rust_defs])
AM_CONDITIONAL(enable_msan,test "$enable_msan" = "yes")
AC_MSG_CHECKING(for --enable-tsan)
AC_ARG_ENABLE(tsan, AS_HELP_STRING([--enable-tsan], [Enable gcc/clang thread sanitizer options (default: no)]),
[
AC_MSG_RESULT(yes)
tsan_cflags="-fsanitize=thread -fno-omit-frame-pointer"
tsan_rust_defs="-Z sanitizer=thread"
],
[
AC_MSG_RESULT(no)
tsan_cflags=""
tsan_rust_defs=""
])
AC_SUBST([tsan_cflags])
AC_SUBST([tsan_rust_defs])
AM_CONDITIONAL(enable_tsan,test "$enable_tsan" = "yes")
AC_MSG_CHECKING(for --enable-ubsan)
AC_ARG_ENABLE(ubsan, AS_HELP_STRING([--enable-tsan], [Enable gcc/clang undefined behaviour sanitizer options (default: no)]),
[
AC_MSG_RESULT(yes)
ubsan_cflags="-fsanitize=undefined -fno-omit-frame-pointer"
ubsan_rust_defs=""
],
[
AC_MSG_RESULT(no)
ubsan_cflags=""
ubsan_rust_defs=""
])
AC_SUBST([ubsan_cflags])
AC_SUBST([ubsan_rust_defs])
AM_CONDITIONAL(enable_ubsan,test "$enable_ubsan" = "yes")
# Enable CLANG
AC_MSG_CHECKING(for --enable-clang)
AC_ARG_ENABLE(clang, AS_HELP_STRING([--enable-clang], [Enable clang (default: no)]),
[
AC_MSG_RESULT(yes)
],
[
AC_MSG_RESULT(no)
])
AM_CONDITIONAL(CLANG_ENABLE,test "$enable_clang" = "yes")
# Enable Perl
if test -z "$enable_perl" ; then
enable_perl=yes
fi
......@@ -124,15 +228,19 @@ AC_ARG_ENABLE(gcc-security, AS_HELP_STRING([--enable-gcc-security], [Enable gcc
[
AC_MSG_RESULT(yes)
AM_COND_IF([RPM_HARDEND_CC],
[ gccsec_defs="-Wall -Wp,-D_FORITY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -Werror=format-security -specs=/usr/lib/rpm/redhat/redhat-hardened-cc1 " ],
[ gccsec_defs="-Wall -Wp,-D_FORITY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -Werror=format-security" ]
[ gccsec_cflags="-Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -Werror=format-security -specs=/usr/lib/rpm/redhat/redhat-hardened-cc1 " ],
[ gccsec_cflags="-Wall -Wp,-D_FORTIFY_SOURCE=2 -fexceptions -fstack-protector-strong --param=ssp-buffer-size=4 -grecord-gcc-switches -Werror=format-security" ]
)
],
[
# Without this, -fPIC doesn't work on generic fedora builds, --disable-gcc-sec.
AC_MSG_RESULT(no)
gccsec_defs=""
AM_COND_IF([RPM_HARDEND_CC],
[ gccsec_cflags="-specs=/usr/lib/rpm/redhat/redhat-hardened-cc1" ],
[ gccsec_cflags="" ]
)
])
AC_SUBST([gccsec_defs])
AC_SUBST([gccsec_cflags])
# Pull in profiling.
AC_MSG_CHECKING(for --enable-profiling)
......@@ -301,6 +409,7 @@ fi
m4_include(m4/fhs.m4)
localrundir='/run'
cockpitdir=/389-console
# installation paths - by default, we store everything
# under the prefix. The with-fhs option will use /usr,
......@@ -437,12 +546,12 @@ if test -n "$with_pythonexec"; then
if test "$with_pythonexec" = yes ; then
AC_MSG_ERROR([You must specify --with-pythonexec=/full/path/to/python])
elif test "$with_pythonexec" = no ; then
with_pythonexec=/usr/bin/python2
with_pythonexec=/usr/bin/python3
else
AC_MSG_RESULT([$with_pythonexec])
fi
else
with_pythonexec=/usr/bin/python2
with_pythonexec=/usr/bin/python3
fi
AC_SUBST(prefixdir)
......@@ -461,9 +570,9 @@ AC_SUBST(infdir)
AC_SUBST(mibdir)
AC_SUBST(mandir)
AC_SUBST(updatedir)
AC_SUBST(defaultuser)
AC_SUBST(defaultgroup)
AC_SUBST(cockpitdir)
# check for --with-instconfigdir
AC_MSG_CHECKING(for --with-instconfigdir)
......@@ -522,7 +631,7 @@ fi
if test -n "$with_pythonexec"; then
pythonexec="$with_pythonexec"
else
pythonexec='/usr/bin/env python2'
pythonexec='/usr/bin/python3'
fi
# Default to no atomic queue operations.
......@@ -544,7 +653,6 @@ case $host in
platform="linux"
initdir='$(sysconfdir)/rc.d/init.d'
# do arch specific linux stuff here
# TCMalloc is only on i686, x86_64, ppc64 and arm, so we pick that here.
case $host in
i*86-*-linux*)
AC_DEFINE([CPU_x86], [], [cpu type x86])
......@@ -733,7 +841,6 @@ m4_include(m4/openldap.m4)
m4_include(m4/mozldap.m4)
m4_include(m4/db.m4)
m4_include(m4/sasl.m4)
m4_include(m4/svrcore.m4)
m4_include(m4/icu.m4)
m4_include(m4/netsnmp.m4)
m4_include(m4/kerberos.m4)
......@@ -742,7 +849,6 @@ m4_include(m4/selinux.m4)
m4_include(m4/systemd.m4)
m4_include(m4/cmocka.m4)
m4_include(m4/doxygen.m4)
m4_include(m4/tcmalloc.m4)
PACKAGE_BASE_VERSION=`echo $PACKAGE_VERSION | awk -F\. '{print $1"."$2}'`
AC_SUBST(PACKAGE_BASE_VERSION)
......@@ -794,7 +900,7 @@ AC_DEFINE([LDAP_ERROR_LOGGING], [1], [LDAP error logging flag])
# AC_CONFIG_FILES([ldap/admin/src/defaults.inf])
AC_CONFIG_FILES([src/pkgconfig/dirsrv.pc src/pkgconfig/nunc-stans.pc src/pkgconfig/libsds.pc])
AC_CONFIG_FILES([src/pkgconfig/dirsrv.pc src/pkgconfig/nunc-stans.pc src/pkgconfig/libsds.pc src/pkgconfig/svrcore.pc])
AC_CONFIG_FILES([Makefile rpm/389-ds-base.spec ])
......
import subprocess
import logging
import pytest
import os
from enum import Enum
pkgs = ['389-ds-base', 'nss', 'nspr', 'openldap', 'cyrus-sasl']
class FIPSState(Enum):
ENABLED = 'enabled'
DISABLED = 'disabled'
NOT_AVAILABLE = 'not_available'
def __unicode__(self):
return self.value
def __str__(self):
return self.value
def get_rpm_version(pkg):
try:
result = subprocess.check_output(['rpm', '-q', '--queryformat',
'%{VERSION}-%{RELEASE}', pkg])
except:
result = b"not installed"
return result.decode('utf-8')
def is_fips():
# Are we running in FIPS mode?
if not os.path.exists('/proc/sys/crypto/fips_enabled'):
return FIPSState.NOT_AVAILABLE
state = None
with open('/proc/sys/crypto/fips_enabled', 'r') as f:
state = f.readline()
if state == '1':
return FIPSState.ENABLED
else:
return FIPSState.DISABLED
@pytest.fixture(autouse=True)
def _environment(request):
if "_metadata" in dir(request.config):
for pkg in pkgs:
request.config._metadata[pkg] = get_rpm_version(pkg)
request.config._metadata['FIPS'] = is_fips()
def pytest_cmdline_main(config):
logging.basicConfig(level=logging.DEBUG)
def pytest_report_header(config):
header = ""
for pkg in pkgs:
header += "%s: %s\n" % (pkg, get_rpm_version(pkg))
header += "FIPS: %s" % is_fips()
return header
@pytest.mark.optionalhook
def pytest_html_results_table_header(cells):
cells.pop()
@pytest.mark.optionalhook
def pytest_html_results_table_row(report, cells):
cells.pop()
......@@ -61,24 +61,45 @@ def writeFinalizer():
def get_existing_topologies(inst, masters, hubs, consumers):
"""Check if the requested topology exists"""
setup_text = ""
if inst:
if inst == 1:
i = 'st'
setup_text = "Standalone Instance"
else:
i = 'i{}'.format(inst)
setup_text = "{} Standalone Instances".format(inst)
else:
i = ''
if masters:
ms = 'm{}'.format(masters)
if len(setup_text) > 0:
setup_text += ", "
if masters == 1:
setup_text += "Master Instance"
else:
setup_text += "{} Master Instances".format(masters)
else:
ms = ''
if hubs:
hs = 'h{}'.format(hubs)
if len(setup_text) > 0:
setup_text += ", "
if hubs == 1:
setup_text += "Hub Instance"
else:
setup_text += "{} Hub Instances".format(hubs)
else:
hs = ''
if consumers:
cs = 'c{}'.format(consumers)
if len(setup_text) > 0:
setup_text += ", "
if consumers == 1:
setup_text += "Consumer Instance"
else:
setup_text += "{} Consumer Instances".format(consumers)
else:
cs = ''
......@@ -86,9 +107,9 @@ def get_existing_topologies(inst, masters, hubs, consumers):
# Returns True in the first element of a list, if topology was found
if my_topology in dir(topologies):
return [True, my_topology]
return [True, my_topology, setup_text]
else:
return [False, my_topology]
return [False, my_topology, setup_text]
def check_id_uniqueness(id_value):
......@@ -181,10 +202,11 @@ if len(sys.argv) > 0:
# Extract usable values
ticket = args.ticket
suite = args.suite
if args.inst == '0' and args.masters == '0' and args.hubs == '0' \
and args.consumers == '0':
instances = 1
my_topology = [True, 'topology_st']
my_topology = [True, 'topology_st', "Standalone Instance"]
else:
instances = int(args.inst)
masters = int(args.masters)
......@@ -192,6 +214,7 @@ if len(sys.argv) > 0:
consumers = int(args.consumers)
my_topology = get_existing_topologies(instances, masters, hubs, consumers)
filename = args.filename
setup_text = my_topology[2]
# Create/open the new test script file
if not filename:
......@@ -210,9 +233,10 @@ if len(sys.argv) > 0:
if my_topology[0]:
topology_import = 'from lib389.topologies import {} as topo\n'.format(my_topology[1])
else:
topology_import = ''
topology_import = 'from lib389.topologies import create_topology\n'
TEST.write('import logging\nimport pytest\nimport os\n')
TEST.write('from lib389._constants import *\n')
TEST.write('{}\n'.format(topology_import))
TEST.write('DEBUGGING = os.getenv("DEBUGGING", default=False)\n')
......@@ -220,7 +244,7 @@ if len(sys.argv) > 0:
TEST.write(' logging.getLogger(__name__).setLevel(logging.DEBUG)\n')
TEST.write('else:\n')
TEST.write(' logging.getLogger(__name__).setLevel(logging.INFO)\n')
TEST.write('log = logging.getLogger(__name__)\n\n\n')
TEST.write('log = logging.getLogger(__name__)\n\n')
# Add topology function for non existing (in lib389/topologies.py) topologies only
if not my_topology[0]:
......@@ -236,7 +260,7 @@ if len(sys.argv) > 0:
topologies_str += " {} standalone instances".format(instances)
# Write the 'topology function'
TEST.write('@pytest.fixture(scope="module")\n')
TEST.write('\n@pytest.fixture(scope="module")\n')
TEST.write('def topo(request):\n')
TEST.write(' """Create a topology with{}"""\n\n'.format(topologies_str))
TEST.write(' topology = create_topology({\n')
......@@ -255,6 +279,7 @@ if len(sys.argv) > 0:
TEST.write(' # replicas.test(DEFAULT_SUFFIX, topology.cs["consumer1"])\n')
writeFinalizer()
TEST.write(' return topology\n\n')
tc_id = '0'
while not check_id_uniqueness(tc_id): tc_id = uuid.uuid4()
......@@ -266,7 +291,7 @@ if len(sys.argv) > 0:
TEST.write('\ndef test_something(topo):\n')
TEST.write(' """Specify a test case purpose or name here\n\n')
TEST.write(' :id: {}\n'.format(tc_id))
TEST.write(' :setup: Fill in set up configuration here\n')
TEST.write(' :setup: ' + setup_text + '\n')
TEST.write(' :steps:\n')
TEST.write(' 1. Fill in test case steps here\n')
TEST.write(' 2. And indent them like this (RST format requirement)\n')
......@@ -289,7 +314,7 @@ if len(sys.argv) > 0:
TEST.write(' # Run isolated\n')
TEST.write(' # -s for DEBUG mode\n')
TEST.write(' CURRENT_FILE = os.path.realpath(__file__)\n')
TEST.write(' pytest.main("-s %s" % CURRENT_FILE)\n\n')
TEST.write(' pytest.main(["-s", CURRENT_FILE])\n\n')
# Done, close things up
TEST.close()
......
This diff is collapsed.
# --- BEGIN COPYRIGHT BLOCK ---
# Copyright (C) 2017 Red Hat, Inc.
# All rights reserved.
#
# License: GPL (version 3 or any later version).
# See LICENSE for details.
# --- END COPYRIGHT BLOCK ---
#
import pytest
from lib389.topologies import topology_st
from lib389.plugins import ClassOfServicePlugin
from lib389.cos import CosIndirectDefinitions, CosTemplates, CosTemplate
from lib389.idm.user import UserAccounts, TEST_USER_PROPERTIES
from lib389.idm.organizationalunit import OrganizationalUnits
from lib389._constants import DEFAULT_SUFFIX
import time
# Given this should complete is about 0.005, this is generous.
# For the final test with 20 templates, about 0.02 is an acceptable time.
THRESHOLD = 0.05
class OUCosTemplate(CosTemplate):
def __init__(self, instance, dn=None):
"""Create a OU specific cos template to replicate a specific user setup.
This template provides ou attrs onto the target entry.
:param instance: A dirsrv instance
:type instance: DirSrv
:param dn: The dn of the template
:type dn: str
"""
super(OUCosTemplate, self).__init__(instance, dn)
self._rdn_attribute = 'ou'
self._must_attributes = ['ou']
self._create_objectclasses = [
'top',
'cosTemplate',
'organizationalUnit',
]
class OUCosTemplates(CosTemplates):
def __init__(self, instance, basedn, rdn=None):
"""Create an OU specific cos templates to replicate a specific use setup.
This costemplates object allows access to the OUCosTemplate types.
:param instance: A dirsrv instance
:type instance: DirSrv
:param basedn: The basedn of the templates
:type basedn: str
:param rdn: The rdn of the templates
:type rdn: str
"""
super(OUCosTemplates, self).__init__(instance, basedn, rdn)
self._objectclasses = [
'cosTemplate',
'organizationalUnit',
]
self._filterattrs = ['ou']
self._childobject = OUCosTemplate
def test_indirect_template_scale(topology_st):
"""Test that cos templates can be added at a reasonable scale
:id: 7cbcdf22-1f9c-4222-9e76-685fe374fc20
:steps:
1. Enable COS plugin
2. Create the test user
3. Add an indirect cos template
4. Add a cos template
5. Add the user to the cos template and assert it works.
6. Add 25,000 templates to the database
7. Search the user. It should not exceed THRESHOLD.
:expected results:
1. It is enabled.
2. It is created.
3. Is is created.
4. It is created.
5. It is valid.
6. They are created.
7. It is fast.
"""
cos_plugin = ClassOfServicePlugin(topology_st.standalone)
cos_plugin.enable()
topology_st.standalone.restart()
# Now create, the indirect specifier, and a user to template onto.
users = UserAccounts(topology_st.standalone, DEFAULT_SUFFIX)
user = users.create(properties=TEST_USER_PROPERTIES)
cos_inds = CosIndirectDefinitions(topology_st.standalone, DEFAULT_SUFFIX)
cos_ind = cos_inds.create(properties={
'cn' : 'cosIndirectDef',
'cosIndirectSpecifier': 'seeAlso',
'cosAttribute': [
'ou merge-schemes',
'description merge-schemes',
'postalCode merge-schemes',
],
})
ous = OrganizationalUnits(topology_st.standalone, DEFAULT_SUFFIX)
ou_temp = ous.create(properties={'ou': 'templates'})
cos_temps = OUCosTemplates(topology_st.standalone, ou_temp.dn)
cos_temp_u = cos_temps.create(properties={
'ou' : 'ou_temp_u',
'description' : 'desc_temp_u',
'postalCode': '0'
})
# Edit the user to add the seeAlso ...
user.set('seeAlso', cos_temp_u.dn)
# Now create 25,0000 templates, they *don't* need to apply to the user though!
for i in range(1, 25001):
cos_temp_u = cos_temps.create(properties={
'ou' : 'ou_temp_%s' % i,
'description' : 'desc_temp_%s' % i,
'postalCode': '%s' % i
})
if i % 500 == 0:
start_time = time.monotonic()
u_search = users.get('testuser')
attrs = u_search.get_attr_vals_utf8('postalCode')
end_time = time.monotonic()
diff_time = end_time - start_time
assert diff_time < THRESHOLD
if i == 10000:
# Now add our user to this template also.
user.add('seeAlso', cos_temp_u.dn)
start_time = time.monotonic()
attrs_after = u_search.get_attr_vals_utf8('postalCode')
end_time = time.monotonic()
diff_time = end_time - start_time
assert(set(attrs) < set(attrs_after))
assert diff_time < THRESHOLD
......@@ -19,6 +19,8 @@ from lib389.properties import *
from lib389.tasks import *
from lib389.utils import *
from lib389.idm.directorymanager import DirectoryManager
logging.getLogger(__name__).setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s' +
' - %(message)s')
......@@ -34,6 +36,7 @@ CHECK_CONVERGENCE = True
ENABLE_VALGRIND = False
RUNNING = True
DEBUGGING = os.getenv('DEBUGGING', default=False)
class TopologyReplication(object):
def __init__(self, master1, master2):
......@@ -50,9 +53,10 @@ def topology(request):
args_instance[SER_DEPLOYED_DIR] = installation1_prefix
# Creating master 1...
master1 = DirSrv(verbose=False)
master1 = DirSrv(verbose=DEBUGGING)
args_instance[SER_HOST] = HOST_MASTER_1
args_instance[SER_PORT] = PORT_MASTER_1
args_instance[SER_SECURE_PORT] = SECUREPORT_MASTER_1
args_instance[SER_SERVERID_PROP] = SERVERID_MASTER_1
args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX
args_master = args_instance.copy()
......@@ -66,9 +70,10 @@ def topology(request):
replicaId=REPLICAID_MASTER_1)
# Creating master 2...
master2 = DirSrv(verbose=False)
master2 = DirSrv(verbose=DEBUGGING)
args_instance[SER_HOST] = HOST_MASTER_2
args_instance[SER_PORT] = PORT_MASTER_2
args_instance[SER_SECURE_PORT] = SECUREPORT_MASTER_2
args_instance[SER_SERVERID_PROP] = SERVERID_MASTER_2
args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX
args_master = args_instance.copy()
......@@ -220,7 +225,8 @@ class AddDelUsers(threading.Thread):
idx = 0
RDN = 'uid=add_del_master_' + self.id + '-'
conn = self.inst.openConnection()
conn = DirectoryManager(self.inst).bind()
while idx < NUM_USERS:
USER_DN = RDN + str(idx) + ',' + DEFAULT_SUFFIX
try:
......@@ -236,7 +242,7 @@ class AddDelUsers(threading.Thread):
conn.close()
# Delete 5000 entries
conn = self.inst.openConnection()
conn = DirectoryManager(self.inst).bind()
idx = 0
while idx < NUM_USERS:
USER_DN = RDN + str(idx) + ',' + DEFAULT_SUFFIX
......@@ -259,7 +265,7 @@ class ModUsers(threading.Thread):
def run(self):
# Mod existing entries
conn = self.inst.openConnection()
conn = DirectoryManager(self.inst).bind()
idx = 0
while idx < NUM_USERS:
USER_DN = ('uid=master' + self.id + '_entry' + str(idx) + ',' +
......@@ -275,7 +281,7 @@ class ModUsers(threading.Thread):
conn.close()
# Modrdn existing entries
conn = self.inst.openConnection()
conn = DirectoryManager(self.inst).bind()
idx = 0
while idx < NUM_USERS:
USER_DN = ('uid=master' + self.id + '_entry' + str(idx) + ',' +
......@@ -290,7 +296,7 @@ class ModUsers(threading.Thread):
conn.close()
# Undo modrdn to we can rerun this test
conn = self.inst.openConnection()
conn = DirectoryManager(self.inst).bind()
idx = 0
while idx < NUM_USERS:
USER_DN = ('cn=master' + self.id + '_entry' + str(idx) + ',' +
......@@ -315,7 +321,7 @@ class DoSearches(threading.Thread):
def run(self):
# Equality
conn = self.inst.openConnection()
conn = DirectoryManager(self.inst).bind()
idx = 0
while idx < NUM_USERS:
search_filter = ('(|(uid=master' + self.id + '_entry' + str(idx) +
......@@ -333,7 +339,7 @@ class DoSearches(threading.Thread):
conn.close()
# Substring
conn = self.inst.openConnection()
conn = DirectoryManager(self.inst).bind()
idx = 0
while idx < NUM_USERS:
search_filter = ('(|(uid=master' + self.id + '_entry' + str(idx) +
......@@ -360,7 +366,7 @@ class DoFullSearches(threading.Thread):
def run(self):
global RUNNING
conn = self.inst.openConnection()
conn = DirectoryManager(self.inst).bind()
while RUNNING:
time.sleep(2)
try:
......
......@@ -12,8 +12,9 @@ from lib389._constants import *
from lib389.properties import *
from lib389.tasks import *
from lib389.utils import *
from lib389.idm.directorymanager import DirectoryManager
DEBUGGING = False
DEBUGGING = os.getenv('DEBUGGING', default=False)
if DEBUGGING:
logging.getLogger(__name__).setLevel(logging.DEBUG)
......@@ -41,12 +42,10 @@ def topology(request):
"""Create DS Deployment"""
# Creating standalone instance ...
if DEBUGGING:
standalone = DirSrv(verbose=True)
else:
standalone = DirSrv(verbose=False)
standalone = DirSrv(verbose=DEBUGGING)
args_instance[SER_HOST] = HOST_STANDALONE
args_instance[SER_PORT] = PORT_STANDALONE
args_instance[SER_SECURE_PORT] = SECUREPORT_STANDALONE
args_instance[SER_SERVERID_PROP] = SERVERID_STANDALONE
args_instance[SER_CREATION_SUFFIX] = DEFAULT_SUFFIX
args_standalone = args_instance.copy()
......@@ -129,7 +128,7 @@ class BindOnlyConn(threading.Thread):
global STOP
while idx < MAX_CONNS and not STOP:
try:
conn = self.inst.openConnection()
conn = DirectoryManager(self.inst).bind(connOnly=True)
conn.unbind_s()
time.sleep(.2)
err_count = 0
......@@ -160,7 +159,7 @@ class IdleConn(threading.Thread):
global STOP
while idx < (MAX_CONNS / 10) and not STOP:
try:
conn = self.inst.openConnection()
conn = self.inst.clone()
conn.simple_bind_s('uid=entry0,dc=example,dc=com', 'password')
conn.search_s('dc=example,dc=com', ldap.SCOPE_SUBTREE,
'uid=*')
......@@ -197,7 +196,7 @@ class LongConn(threading.Thread):
global STOP
while idx < MAX_CONNS and not STOP:
try:
conn = self.inst.openConnection()
conn = self.inst.clone()
conn.search_s('dc=example,dc=com', ldap.SCOPE_SUBTREE,
'objectclass=*')
conn.search_s('dc=example,dc=com', ldap.SCOPE_SUBTREE,
......
import logging
import pytest
import os
import ldap
import time
from lib389._constants import *
from lib389.topologies import topology_st as topo
from lib389.idm.user import UserAccount, UserAccounts, TEST_USER_PROPERTIES
from lib389.idm.domain import Domain
DEBUGGING = os.getenv("DEBUGGING", default=False)
if DEBUGGING:
logging.getLogger(__name__).setLevel(logging.DEBUG)
else:
logging.getLogger(__name__).setLevel(logging.INFO)
log = logging.getLogger(__name__)
BIND_DN2 = 'uid=tuser,ou=People,dc=example,dc=com'
BIND_RDN2 = 'tuser'
BIND_DN = 'uid=tuser1,ou=People,dc=example,dc=com'
BIND_RDN = 'tuser1'
SRCH_FILTER = "uid=tuser1"
SRCH_FILTER2 = "uid=tuser"
aci_list_A = ['(targetattr != "userPassword") (version 3.0; acl "Anonymous access"; allow (read, search, compare)userdn = "ldap:///anyone";)',
'(targetattr = "*") (version 3.0;acl "allow tuser";allow (all)(userdn = "ldap:///uid=tuser5,ou=People,dc=example,dc=com");)',
'(targetattr != "uid || mail") (version 3.0; acl "deny-attrs"; deny (all) (userdn = "ldap:///anyone");)',
'(targetfilter = "(inetUserStatus=1)") ( version 3.0; acl "deny-specific-entry"; deny(all) (userdn = "ldap:///anyone");)']
aci_list_B = ['(targetattr != "userPassword") (version 3.0; acl "Anonymous access"; allow (read, search, compare)userdn = "ldap:///anyone";)',
'(targetattr != "uid || mail") (version 3.0; acl "deny-attrs"; deny (all) (userdn = "ldap:///anyone");)',
'(targetfilter = "(inetUserStatus=1)") ( version 3.0; acl "deny-specific-entry"; deny(all) (userdn = "ldap:///anyone");)']
@pytest.fixture(scope="module")
def aci_setup(topo):
topo.standalone.log.info("Add {}".format(BIND_DN))
user = UserAccount(topo.standalone, BIND_DN)
user_props = TEST_USER_PROPERTIES.copy()
user_props.update({'sn': BIND_RDN,
'cn': BIND_RDN,
'uid': BIND_RDN,
'inetUserStatus': '1',
'objectclass': 'extensibleObject',
'userpassword': PASSWORD})
user.create(properties=user_props, basedn=SUFFIX)
topo.standalone.log.info("Add {}".format(BIND_DN2))
user2 = UserAccount(topo.standalone, BIND_DN2)
user_props = TEST_USER_PROPERTIES.copy()
user_props.update({'sn': BIND_RDN2,
'cn': BIND_RDN2,
'uid': BIND_RDN2,
'userpassword': PASSWORD})
user2.create(properties=user_props, basedn=SUFFIX)
def test_multi_deny_aci(topo, aci_setup):
"""Test that mutliple deny rules work, and that they the cache properly
stores the result
:id: 294c366d-850e-459e-b5a0-3cc828ec3aca
:setup: Standalone Instance
:steps:
1. Add aci_list_A aci's and verify two searches on the same connection
behave the same
2. Add aci_list_B aci's and verify search fails as expected
:expectedresults:
1. Both searches do not return any entries
2. Seaches do not return any entries
"""
if DEBUGGING:
# Maybe add aci logging?
pass
suffix = Domain(topo.standalone, DEFAULT_SUFFIX)
for run in range(2):
topo.standalone.log.info("Pass " + str(run + 1))
# Test ACI List A
topo.standalone.log.info("Testing two searches behave the same...")
topo.standalone.simple_bind_s(DN_DM, PASSWORD)
suffix.set('aci', aci_list_A, ldap.MOD_REPLACE)
time.sleep(1)
topo.standalone.simple_bind_s(BIND_DN, PASSWORD)
entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER)
if entries and entries[0]:
topo.standalone.log.fatal("Incorrectly got an entry returned from search 1")
assert False
entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER)
if entries and entries[0]:
topo.standalone.log.fatal("Incorrectly got an entry returned from search 2")
assert False
entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER2)
if entries is None or len(entries) == 0:
topo.standalone.log.fatal("Failed to get entry as good user")
assert False
entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER2)
if entries is None or len(entries) == 0:
topo.standalone.log.fatal("Failed to get entry as good user")
assert False
# Bind a different user who has rights
topo.standalone.simple_bind_s(BIND_DN2, PASSWORD)
entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER2)
if entries is None or len(entries) == 0:
topo.standalone.log.fatal("Failed to get entry as good user")
assert False
entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER2)
if entries is None or len(entries) == 0:
topo.standalone.log.fatal("Failed to get entry as good user (2)")
assert False
if run > 0:
# Second pass
topo.standalone.restart()
# Reset ACI's and do the second test
topo.standalone.log.info("Testing search does not return any entries...")
topo.standalone.simple_bind_s(DN_DM, PASSWORD)
suffix.set('aci', aci_list_B, ldap.MOD_REPLACE)
time.sleep(1)
topo.standalone.simple_bind_s(BIND_DN, PASSWORD)
entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER)
if entries and entries[0]:
topo.standalone.log.fatal("Incorrectly got an entry returned from search 1")
assert False
entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER)
if entries and entries[0]:
topo.standalone.log.fatal("Incorrectly got an entry returned from search 2")
assert False
if run > 0:
# Second pass
topo.standalone.restart()
# Bind as different user who has rights
topo.standalone.simple_bind_s(BIND_DN2, PASSWORD)
entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER2)
if entries is None or len(entries) == 0:
topo.standalone.log.fatal("Failed to get entry as good user")
assert False
entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER2)
if entries is None or len(entries) == 0:
topo.standalone.log.fatal("Failed to get entry as good user (2)")
assert False
entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER)
if entries and entries[0]:
topo.standalone.log.fatal("Incorrectly got an entry returned from search 1")
assert False
entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER)
if entries and entries[0]:
topo.standalone.log.fatal("Incorrectly got an entry returned from search 2")
assert False
# back to user 1
topo.standalone.simple_bind_s(BIND_DN, PASSWORD)
entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER2)
if entries is None or len(entries) == 0:
topo.standalone.log.fatal("Failed to get entry as user1")
assert False
entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER2)
if entries is None or len(entries) == 0:
topo.standalone.log.fatal("Failed to get entry as user1 (2)")
assert False
entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER)
if entries and entries[0]:
topo.standalone.log.fatal("Incorrectly got an entry returned from search 1")
assert False
entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER)
if entries and entries[0]:
topo.standalone.log.fatal("Incorrectly got an entry returned from search 2")
assert False
topo.standalone.log.info("Test PASSED")
if __name__ == '__main__':
# Run isolated
# -s for DEBUG mode
CURRENT_FILE = os.path.realpath(__file__)
pytest.main(["-s", CURRENT_FILE])
This diff is collapsed.
# --- BEGIN COPYRIGHT BLOCK ---
# Copyright (C) 2016 Red Hat, Inc.
# All rights reserved.
#
# License: GPL (version 3 or any later version).
# See LICENSE for details.
# --- END COPYRIGHT BLOCK ---
#
import pytest
from lib389.tasks import *
from lib389.utils import *
from lib389.topologies import topology_st
logging.getLogger(__name__).setLevel(logging.DEBUG)
log = logging.getLogger(__name__)
CONTAINER_1_OU = 'test_ou_1'
CONTAINER_2_OU = 'test_ou_2'
CONTAINER_1 = f'ou={CONTAINER_1_OU},dc=example,dc=com'
CONTAINER_2 = f'ou={CONTAINER_2_OU},dc=example,dc=com'
USER_CN = 'test_user'
USER_PWD = 'Secret123'
USER = f'cn={USER_CN},{CONTAINER_1}'
@pytest.fixture(scope="module")
def env_setup(topology_st):
"""Adds two containers, one user and two ACI rules"""
log.info("Add a container: %s" % CONTAINER_1)
topology_st.standalone.add_s(Entry((CONTAINER_1,
{'objectclass': 'top',
'objectclass': 'organizationalunit',
'ou': CONTAINER_1_OU,
})))
log.info("Add a container: %s" % CONTAINER_2)
topology_st.standalone.add_s(Entry((CONTAINER_2,
{'objectclass': 'top',
'objectclass': 'organizationalunit',
'ou': CONTAINER_2_OU,
})))
log.info("Add a user: %s" % USER)
topology_st.standalone.add_s(Entry((USER,
{'objectclass': 'top person'.split(),
'cn': USER_CN,
'sn': USER_CN,
'userpassword': USER_PWD
})))
ACI_TARGET = '(targetattr="*")'
ACI_ALLOW = '(version 3.0; acl "All rights for %s"; allow (all) ' % USER
ACI_SUBJECT = 'userdn="ldap:///%s";)' % USER
ACI_BODY = ACI_TARGET + ACI_ALLOW + ACI_SUBJECT
mod = [(ldap.MOD_ADD, 'aci', ensure_bytes(ACI_BODY))]
log.info("Add an ACI 'allow (all)' by %s to the %s" % (USER,
CONTAINER_1))
topology_st.standalone.modify_s(CONTAINER_1, mod)
log.info("Add an ACI 'allow (all)' by %s to the %s" % (USER,
CONTAINER_2))
topology_st.standalone.modify_s(CONTAINER_2, mod)
@pytest.mark.ds47553
def test_enhanced_aci_modrnd(topology_st, env_setup):
"""Tests, that MODRDN operation is allowed,
if user has ACI right '(all)' under superior entries,
but doesn't have '(modrdn)'
:id: 492cf2a9-2efe-4e3b-955e-85eca61d66b9
:setup: Standalone instance
:steps:
1. Create two containers
2. Create a user within "ou=test_ou_1,dc=example,dc=com"
3. Add an aci with a rule "cn=test_user is allowed all" within these containers
4. Run MODRDN operation on the "cn=test_user" and set "newsuperior" to
the "ou=test_ou_2,dc=example,dc=com"
5. Check there is no user under container one (ou=test_ou_1,dc=example,dc=com)
6. Check there is a user under container two (ou=test_ou_2,dc=example,dc=com)
:expectedresults:
1. Two containers should be created
2. User should be added successfully
3. This should pass
4. This should pass
5. User should not be found under container ou=test_ou_1,dc=example,dc=com
6. User should be found under container ou=test_ou_2,dc=example,dc=com
"""
log.info("Bind as %s" % USER)
topology_st.standalone.simple_bind_s(USER, USER_PWD)
log.info("User MODRDN operation from %s to %s" % (CONTAINER_1,
CONTAINER_2))
topology_st.standalone.rename_s(USER, "cn=%s" % USER_CN,
newsuperior=CONTAINER_2, delold=1)
log.info("Check there is no user in %s" % CONTAINER_1)
entries = topology_st.standalone.search_s(CONTAINER_1,
ldap.SCOPE_ONELEVEL,
'cn=%s' % USER_CN)
assert not entries
log.info("Check there is our user in %s" % CONTAINER_2)
entries = topology_st.standalone.search_s(CONTAINER_2,
ldap.SCOPE_ONELEVEL,
'cn=%s' % USER_CN)
assert entries
if __name__ == '__main__':
# Run isolated
# -s for DEBUG mode
# -v for additional verbose
CURRENT_FILE = os.path.realpath(__file__)
pytest.main("-s -v %s" % CURRENT_FILE)
......@@ -34,6 +34,8 @@ BOGUSSUFFIX = 'uid=bogus,ou=people,dc=bogus'
GROUPOU = 'ou=groups,%s' % DEFAULT_SUFFIX
BOGUSOU = 'ou=OU,%s' % DEFAULT_SUFFIX
def get_ldap_error_msg(e, type):
return e.args[0][type]
def pattern_accesslog(file, log_pattern):
for i in range(5):
......@@ -111,7 +113,7 @@ def check_op_result(server, op, dn, superior, exists, rc):
server.add_s(Entry((dn, {'objectclass': 'top extensibleObject'.split(),
'cn': 'test entry'})))
elif op == 'modify':
server.modify_s(dn, [(ldap.MOD_REPLACE, 'description', 'test')])
server.modify_s(dn, [(ldap.MOD_REPLACE, 'description', b'test')])
elif op == 'modrdn':
if superior is not None:
server.rename_s(dn, 'uid=new', newsuperior=superior, delold=1)
......@@ -125,10 +127,10 @@ def check_op_result(server, op, dn, superior, exists, rc):
except ldap.LDAPError as e:
hit = 1
log.info("Exception (expected): %s" % type(e).__name__)
log.info('Desc ' + e.message['desc'])
log.info('Desc {}'.format(get_ldap_error_msg(e,'desc')))
assert isinstance(e, rc)
if 'matched' in e.message:
log.info('Matched is returned: ' + e.message['matched'])
if 'matched' in e.args:
log.info('Matched is returned: {}'.format(get_ldap_error_msg(e, 'matched')))
if rc != ldap.NO_SUCH_OBJECT:
assert False
......@@ -144,14 +146,43 @@ def check_op_result(server, op, dn, superior, exists, rc):
log.info('PASSED\n')
def test_ticket1347760(topology_st):
"""
Prevent revealing the entry info to whom has no access rights.
@pytest.mark.bz1347760
def test_repeated_ldap_add(topology_st):
"""Prevent revealing the entry info to whom has no access rights.
:id: 76d278bd-3e51-4579-951a-753e6703b4df
:setup: Standalone instance
:steps:
1. Disable accesslog logbuffering
2. Bind as "cn=Directory Manager"
3. Add a organisational unit as BOU
4. Add a bind user as uid=buser123,ou=BOU,dc=example,dc=com
5. Add a test user as uid=tuser0,ou=People,dc=example,dc=com
6. Delete aci in dc=example,dc=com
7. Bind as Directory Manager, acquire an access log path and instance dir
8. Bind as uid=buser123,ou=BOU,dc=example,dc=com who has no right to read the entry
9. Bind as uid=bogus,ou=people,dc=bogus,bogus who does not exist
10. Bind as uid=buser123,ou=BOU,dc=example,dc=com,bogus with wrong password
11. Adding aci for uid=buser123,ou=BOU,dc=example,dc=com to ou=BOU,dc=example,dc=com.
12. Bind as uid=buser123,ou=BOU,dc=example,dc=com now who has right to read the entry
:expectedresults:
1. Operation should be successful
2. Operation should be successful
3. Operation should be successful
4. Operation should be successful
5. Operation should be successful
6. Operation should be successful
7. Operation should be successful
8. Bind operation should be successful with no search result
9. Bind operation should Fail
10. Bind operation should Fail
11. Operation should be successful
12. Bind operation should be successful with search result
"""
log.info('Testing Bug 1347760 - Information disclosure via repeated use of LDAP ADD operation, etc.')
log.info('Disabling accesslog logbuffering')
topology_st.standalone.modify_s(CONFIG_DN, [(ldap.MOD_REPLACE, 'nsslapd-accesslog-logbuffering', 'off')])
topology_st.standalone.modify_s(CONFIG_DN, [(ldap.MOD_REPLACE, 'nsslapd-accesslog-logbuffering', b'off')])
log.info('Bind as {%s,%s}' % (DN_DM, PASSWORD))
topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
......@@ -189,7 +220,7 @@ def test_ticket1347760(topology_st):
try:
topology_st.standalone.simple_bind_s(BINDDN, BINDPW)
except ldap.LDAPError as e:
log.info('Desc ' + e.message['desc'])
log.info('Desc {}'.format(get_ldap_error_msg(e,'desc')))
assert False
file_obj = open(file_path, "r")
......@@ -202,7 +233,7 @@ def test_ticket1347760(topology_st):
topology_st.standalone.simple_bind_s(BOGUSDN, 'bogus')
except ldap.LDAPError as e:
log.info("Exception (expected): %s" % type(e).__name__)
log.info('Desc ' + e.message['desc'])
log.info('Desc {}'.format(get_ldap_error_msg(e,'desc')))
assert isinstance(e, ldap.INVALID_CREDENTIALS)
regex = re.compile('No such entry')
cause = pattern_accesslog(file_obj, regex)
......@@ -234,7 +265,7 @@ def test_ticket1347760(topology_st):
topology_st.standalone.simple_bind_s(BINDDN, 'bogus')
except ldap.LDAPError as e:
log.info("Exception (expected): %s" % type(e).__name__)
log.info('Desc ' + e.message['desc'])
log.info('Desc {}'.format(get_ldap_error_msg(e,'desc')))
assert isinstance(e, ldap.INVALID_CREDENTIALS)
regex = re.compile('Invalid credentials')
cause = pattern_accesslog(file_obj, regex)
......@@ -250,7 +281,7 @@ def test_ticket1347760(topology_st):
log.info('aci: %s' % acival)
log.info('Bind as {%s,%s}' % (DN_DM, PASSWORD))
topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
topology_st.standalone.modify_s(BINDOU, [(ldap.MOD_ADD, 'aci', acival)])
topology_st.standalone.modify_s(BINDOU, [(ldap.MOD_ADD, 'aci', ensure_bytes(acival))])
time.sleep(1)
log.info('Bind case 3. the bind user has the right to read the entry itself, bind should be successful.')
......@@ -376,14 +407,14 @@ def test_ticket1347760(topology_st):
acival = '(targetattr="*")(version 3.0; acl "%s-all"; allow(all) userdn = "ldap:///%s";)' % (BUID, BINDDN)
log.info('Bind as {%s,%s}' % (DN_DM, PASSWORD))
topology_st.standalone.simple_bind_s(DN_DM, PASSWORD)
topology_st.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_ADD, 'aci', acival)])
topology_st.standalone.modify_s(DEFAULT_SUFFIX, [(ldap.MOD_ADD, 'aci', ensure_bytes(acival))])
time.sleep(1)
log.info('Bind as {%s,%s}.' % (BINDDN, BINDPW))
try:
topology_st.standalone.simple_bind_s(BINDDN, BINDPW)
except ldap.LDAPError as e:
log.info('Desc ' + e.message['desc'])
log.info('Desc {}'.format(get_ldap_error_msg(e,'desc')))
assert False
time.sleep(1)
......@@ -434,7 +465,7 @@ def test_ticket1347760(topology_st):
topology_st.standalone.simple_bind_s(BINDDN, BUID)
except ldap.LDAPError as e:
log.info("Exception (expected): %s" % type(e).__name__)
log.info('Desc ' + e.message['desc'])
log.info('Desc {}'.format(get_ldap_error_msg(e,'desc')))
assert isinstance(e, ldap.UNWILLING_TO_PERFORM)
log.info('Bind as {%s,%s} which should fail with %s.' % (BINDDN, 'bogus', ldap.UNWILLING_TO_PERFORM.__name__))
......@@ -442,7 +473,7 @@ def test_ticket1347760(topology_st):
topology_st.standalone.simple_bind_s(BINDDN, 'bogus')
except ldap.LDAPError as e:
log.info("Exception (expected): %s" % type(e).__name__)
log.info('Desc ' + e.message['desc'])
log.info('Desc {}'.format(get_ldap_error_msg(e,'desc')))
assert isinstance(e, ldap.UNWILLING_TO_PERFORM)
log.info('SUCCESS')
......@@ -453,3 +484,4 @@ if __name__ == '__main__':
# -s for DEBUG mode
CURRENT_FILE = os.path.realpath(__file__)
pytest.main("-s %s" % CURRENT_FILE)
......@@ -46,18 +46,14 @@ def _oc_definition(oid_ext, name, must=None, may=None):
may = MAY
new_oc = "( %s NAME '%s' DESC '%s' SUP %s AUXILIARY MUST %s MAY %s )" % (oid, name, desc, sup, must, may)
return new_oc
return ensure_bytes(new_oc)
def test_ticket47653_init(topology_st):
"""
It adds
- Objectclass with MAY 'member'
- an entry ('bind_entry') with which we bind to test the 'SELFDN' operation
It deletes the anonymous aci
"""
@pytest.fixture(scope="module")
def allow_user_init(topology_st):
"""Initialize the test environment
"""
topology_st.standalone.log.info("Add %s that allows 'member' attribute" % OC_NAME)
new_oc = _oc_definition(2, OC_NAME, must=MUST, may=MAY)
topology_st.standalone.schema.add_schema('objectClasses', new_oc)
......@@ -71,7 +67,7 @@ def test_ticket47653_init(topology_st):
'userpassword': BIND_PW})))
# enable acl error logging
mod = [(ldap.MOD_REPLACE, 'nsslapd-errorlog-level', '128')]
mod = [(ldap.MOD_REPLACE, 'nsslapd-errorlog-level', b'128')]
topology_st.standalone.modify_s(DN_CONFIG, mod)
# Remove aci's to start with a clean slate
......@@ -87,13 +83,25 @@ def test_ticket47653_init(topology_st):
'cn': name})))
def test_ticket47653_add(topology_st):
'''
It checks that, bound as bind_entry,
- we can not ADD an entry without the proper SELFDN aci.
- with the proper ACI we can not ADD with 'member' attribute
- with the proper ACI and 'member' it succeeds to ADD
'''
@pytest.mark.ds47653
def test_selfdn_permission_add(topology_st, allow_user_init):
"""Check add entry operation with and without SelfDN aci
:id: e837a9ef-be92-48da-ad8b-ebf42b0fede1
:setup: Standalone instance, add a entry which is used to bind,
enable acl error logging by setting 'nsslapd-errorlog-level' to '128',
remove aci's to start with a clean slate, and add dummy entries
:steps:
1. Check we can not ADD an entry without the proper SELFDN aci
2. Check with the proper ACI we can not ADD with 'member' attribute
3. Check entry to add with memberS and with the ACI
4. Check with the proper ACI and 'member' it succeeds to ADD
:expectedresults:
1. Operation should be successful
2. Operation should be successful
3. Operation should fail with Insufficient Access
4. Operation should be successful
"""
topology_st.standalone.log.info("\n\n######################### ADD ######################\n")
# bind as bind_entry
......@@ -143,7 +151,7 @@ def test_ticket47653_add(topology_st):
ACI_ALLOW = "(version 3.0; acl \"SelfDN add\"; allow (add)"
ACI_SUBJECT = " userattr = \"member#selfDN\";)"
ACI_BODY = ACI_TARGET + ACI_TARGETFILTER + ACI_ALLOW + ACI_SUBJECT
mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
mod = [(ldap.MOD_ADD, 'aci', ensure_bytes(ACI_BODY))]
topology_st.standalone.modify_s(SUFFIX, mod)
# bind as bind_entry
......@@ -176,12 +184,23 @@ def test_ticket47653_add(topology_st):
topology_st.standalone.add_s(entry_with_member)
def test_ticket47653_search(topology_st):
'''
It checks that, bound as bind_entry,
- we can not search an entry without the proper SELFDN aci.
- adding the ACI, we can search the entry
'''
@pytest.mark.ds47653
def test_selfdn_permission_search(topology_st, allow_user_init):
"""Check search operation with and without SelfDN aci
:id: 06d51ef9-c675-4583-99b2-4852dbda190e
:setup: Standalone instance, add a entry which is used to bind,
enable acl error logging by setting 'nsslapd-errorlog-level' to '128',
remove aci's to start with a clean slate, and add dummy entries
:steps:
1. Check we can not search an entry without the proper SELFDN aci
2. Add proper ACI
3. Check we can search with the proper ACI
:expectedresults:
1. Operation should be successful
2. Operation should be successful
3. Operation should be successful
"""
topology_st.standalone.log.info("\n\n######################### SEARCH ######################\n")
# bind as bind_entry
topology_st.standalone.log.info("Bind as %s" % BIND_DN)
......@@ -202,7 +221,7 @@ def test_ticket47653_search(topology_st):
ACI_ALLOW = "(version 3.0; acl \"SelfDN search-read\"; allow (read, search, compare)"
ACI_SUBJECT = " userattr = \"member#selfDN\";)"
ACI_BODY = ACI_TARGET + ACI_TARGETATTR + ACI_TARGETFILTER + ACI_ALLOW + ACI_SUBJECT
mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
mod = [(ldap.MOD_ADD, 'aci', ensure_bytes(ACI_BODY))]
topology_st.standalone.modify_s(SUFFIX, mod)
# bind as bind_entry
......@@ -215,12 +234,23 @@ def test_ticket47653_search(topology_st):
assert len(ents) == 1
def test_ticket47653_modify(topology_st):
'''
It checks that, bound as bind_entry,
- we can not modify an entry without the proper SELFDN aci.
- adding the ACI, we can modify the entry
'''
@pytest.mark.ds47653
def test_selfdn_permission_modify(topology_st, allow_user_init):
"""Check modify operation with and without SelfDN aci
:id: 97a58844-095f-44b0-9029-dd29a7d83d68
:setup: Standalone instance, add a entry which is used to bind,
enable acl error logging by setting 'nsslapd-errorlog-level' to '128',
remove aci's to start with a clean slate, and add dummy entries
:steps:
1. Check we can not modify an entry without the proper SELFDN aci
2. Add proper ACI
3. Modify the entry and check the modified value
:expectedresults:
1. Operation should be successful
2. Operation should be successful
3. Operation should be successful
"""
# bind as bind_entry
topology_st.standalone.log.info("Bind as %s" % BIND_DN)
topology_st.standalone.simple_bind_s(BIND_DN, BIND_PW)
......@@ -230,7 +260,7 @@ def test_ticket47653_modify(topology_st):
# entry to modify WITH member being BIND_DN but WITHOUT the ACI -> ldap.INSUFFICIENT_ACCESS
try:
topology_st.standalone.log.info("Try to modify %s (aci is missing)" % ENTRY_DN)
mod = [(ldap.MOD_REPLACE, 'postalCode', '9876')]
mod = [(ldap.MOD_REPLACE, 'postalCode', b'9876')]
topology_st.standalone.modify_s(ENTRY_DN, mod)
except Exception as e:
topology_st.standalone.log.info("Exception (expected): %s" % type(e).__name__)
......@@ -246,7 +276,7 @@ def test_ticket47653_modify(topology_st):
ACI_ALLOW = "(version 3.0; acl \"SelfDN write\"; allow (write)"
ACI_SUBJECT = " userattr = \"member#selfDN\";)"
ACI_BODY = ACI_TARGET + ACI_TARGETATTR + ACI_TARGETFILTER + ACI_ALLOW + ACI_SUBJECT
mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
mod = [(ldap.MOD_ADD, 'aci', ensure_bytes(ACI_BODY))]
topology_st.standalone.modify_s(SUFFIX, mod)
# bind as bind_entry
......@@ -255,20 +285,30 @@ def test_ticket47653_modify(topology_st):
# modify the entry and checks the value
topology_st.standalone.log.info("Try to modify %s. It should succeeds" % ENTRY_DN)
mod = [(ldap.MOD_REPLACE, 'postalCode', '1928')]
mod = [(ldap.MOD_REPLACE, 'postalCode', b'1928')]
topology_st.standalone.modify_s(ENTRY_DN, mod)
ents = topology_st.standalone.search_s(ENTRY_DN, ldap.SCOPE_BASE, 'objectclass=*')
assert len(ents) == 1
assert ents[0].postalCode == '1928'
def test_ticket47653_delete(topology_st):
'''
It checks that, bound as bind_entry,
- we can not delete an entry without the proper SELFDN aci.
- adding the ACI, we can delete the entry
'''
assert ensure_str(ents[0].postalCode) == '1928'
@pytest.mark.ds47653
def test_selfdn_permission_delete(topology_st, allow_user_init):
"""Check delete operation with and without SelfDN aci
:id: 0ec4c0ec-e7b0-4ef1-8373-ab25aae34516
:setup: Standalone instance, add a entry which is used to bind,
enable acl error logging by setting 'nsslapd-errorlog-level' to '128',
remove aci's to start with a clean slate, and add dummy entries
:steps:
1. Check we can not delete an entry without the proper SELFDN aci
2. Add proper ACI
3. Check we can perform delete operation with proper ACI
:expectedresults:
1. Operation should be successful
2. Operation should be successful
"""
topology_st.standalone.log.info("\n\n######################### DELETE ######################\n")
# bind as bind_entry
......@@ -292,14 +332,14 @@ def test_ticket47653_delete(topology_st):
ACI_ALLOW = "(version 3.0; acl \"SelfDN delete\"; allow (delete)"
ACI_SUBJECT = " userattr = \"member#selfDN\";)"
ACI_BODY = ACI_TARGET + ACI_TARGETFILTER + ACI_ALLOW + ACI_SUBJECT
mod = [(ldap.MOD_ADD, 'aci', ACI_BODY)]
mod = [(ldap.MOD_ADD, 'aci', ensure_bytes(ACI_BODY))]
topology_st.standalone.modify_s(SUFFIX, mod)
# bind as bind_entry
topology_st.standalone.log.info("Bind as %s" % BIND_DN)
topology_st.standalone.simple_bind_s(BIND_DN, BIND_PW)
# entry to search with the proper aci
# entry to delete with the proper aci
topology_st.standalone.log.info("Try to delete %s should be successful" % ENTRY_DN)
topology_st.standalone.delete_s(ENTRY_DN)
......
import logging
import pytest
import os
import ldap
from lib389.utils import ds_is_older
from lib389._constants import *
from lib389.plugins import AutoMembershipPlugin, AutoMembershipDefinition, AutoMembershipDefinitions
from lib389._mapped_object import DSLdapObjects, DSLdapObject
from lib389 import agreement
from lib389.idm.user import UserAccount, UserAccounts, TEST_USER_PROPERTIES
from lib389.idm.group import Groups, Group
from lib389.topologies import topology_st as topo
from lib389._constants import DEFAULT_SUFFIX
# Skip on older versions
pytestmark = pytest.mark.skipif(ds_is_older('1.3.7'), reason="Not implemented")
DEBUGGING = os.getenv("DEBUGGING", default=False)
if DEBUGGING:
logging.getLogger(__name__).setLevel(logging.DEBUG)
else:
logging.getLogger(__name__).setLevel(logging.INFO)
log = logging.getLogger(__name__)
@pytest.fixture(scope="module")
def automember_fixture(topo, request):
groups = Groups(topo.standalone, DEFAULT_SUFFIX)
group = groups.create(properties={'cn': 'testgroup'})
automemberplugin = AutoMembershipPlugin(topo.standalone)
automemberplugin.enable()
topo.standalone.restart()
automember_prop = {
'cn': 'testgroup_definition',
'autoMemberScope': 'ou=People,' + DEFAULT_SUFFIX,
'autoMemberFilter': 'objectclass=*',
'autoMemberDefaultGroup': group.dn,
'autoMemberGroupingAttr': 'member:dn',
}
automembers = AutoMembershipDefinitions(topo.standalone, "cn=Auto Membership Plugin,cn=plugins,cn=config")
automember = automembers.create(properties=automember_prop)
return (group, automembers, automember)
def test_automemberscope(automember_fixture, topo):
"""Test if the automember scope is valid
:id: c3d3f250-e7fd-4441-8387-3d24c156e982
:setup: Standalone instance, enabled Auto Membership Plugin
:steps:
1. Create automember with invalid cn that raises
UNWILLING_TO_PERFORM exception
2. If exception raised, set scope to any cn
3. If exception is not raised, set scope to with ou=People
:expectedresults:
1. Should be success
2. Should be success
3. Should be success
"""
(group, automembers, automember) = automember_fixture
automember_prop = {
'cn': 'anyrandomcn',
'autoMemberScope': 'ou=People,' + DEFAULT_SUFFIX,
'autoMemberFilter': 'objectclass=*',
'autoMemberDefaultGroup': group.dn,
'autoMemberGroupingAttr': 'member:dn',
}
# depends on issue #49465
# with pytest.raises(ldap.UNWILLING_TO_PERFORM):
# automember = automembers.create(properties=automember_prop)
# automember.set_scope("cn=No Entry,%s" % DEFAULT_SUFFIX)
automember.set_scope("ou=People,%s" % DEFAULT_SUFFIX)
def test_automemberfilter(automember_fixture, topo):
"""Test if the automember filter is valid
:id: 935c55de-52dc-4f80-b7dd-3aacd30f6df2
:setup: Standalone instance, enabled Auto Membership Plugin
:steps:
1. Create automember with invalid filter that raises
UNWILLING_TO_PERFORM exception
2. If exception raised, set filter to the invalid filter
3. If exception is not raised, set filter as all objectClasses
:expectedresults:
1. Should be success
2. Should be success
3. Should be success
"""
(group, automembers, automember) = automember_fixture
automember_prop = {
'cn': 'anyrandomcn',
'autoMemberScope': 'ou=People,' + DEFAULT_SUFFIX,
'autoMemberFilter': '(ou=People',
'autoMemberDefaultGroup': group.dn,
'autoMemberGroupingAttr': 'member:dn',
}
with pytest.raises(ldap.UNWILLING_TO_PERFORM):
automember = automembers.create(properties=automember_prop)
automember.set_filter("(ou=People")
automember.set_filter("objectClass=*")
def test_adduser(automember_fixture, topo):
"""Test if member is automatically added to the group
:id: 14f1e2f5-2162-41ab-962c-5293516baf2e
:setup: Standalone instance, enabled Auto Membership Plugin
:steps:
1. Create a user
2. Assert that the user is member of the group
:expectedresults:
1. Should be success
2. Should be success
"""
(group, automembers, automember) = automember_fixture
users = UserAccounts(topo.standalone, DEFAULT_SUFFIX)
user = users.create(properties=TEST_USER_PROPERTIES)
assert group.is_member(user.dn)
......@@ -12,13 +12,13 @@
"""
from subprocess import check_output, Popen
from lib389.idm.user import UserAccounts
import pytest
from lib389.tasks import *
from lib389.utils import *
from lib389.topologies import topology_st
from lib389.dbgen import dbgen
from lib389.idm.organizationalunit import OrganizationalUnits
from lib389._constants import DN_DM, PASSWORD, PW_DM
from lib389.topologies import topology_st
......@@ -44,7 +44,7 @@ def import_example_ldif(topology_st):
log.info('Initializing the "basic" test suite')
ldif = '%s/Example.ldif' % get_data_dir(topology_st.standalone.prefix)
ldif = '%s/dirsrv/data/Example.ldif' % topology_st.standalone.get_data_dir()
import_ldif = topology_st.standalone.get_ldif_dir() + "/Example.ldif"
shutil.copyfile(ldif, import_ldif)
topology_st.standalone.tasks.importLDIF(suffix=DEFAULT_SUFFIX,
......@@ -298,7 +298,7 @@ def test_basic_import_export(topology_st, import_example_ldif):
#
# Cleanup - Import the Example LDIF for the other tests in this suite
#
ldif = '%s/Example.ldif' % get_data_dir(topology_st.standalone.prefix)
ldif = '%s/dirsrv/data/Example.ldif' % topology_st.standalone.get_data_dir()
import_ldif = topology_st.standalone.get_ldif_dir() + "/Example.ldif"
shutil.copyfile(ldif, import_ldif)
try:
......@@ -366,6 +366,25 @@ def test_basic_backup(topology_st, import_example_ldif):
log.info('test_basic_backup: PASSED')
def test_basic_db2index(topology_st, import_example_ldif):
"""Assert db2index can operate correctly.
:id: 191fc0fd-9722-46b5-a7c3-e8760effe119
:setup: Standalone instance
:steps:
1: call db2index
:expectedresults:
1: Index succeeds.
"""
topology_st.standalone.stop()
topology_st.standalone.db2index()
topology_st.standalone.db2index(suffixes=[DEFAULT_SUFFIX], attrs=['uid'])
topology_st.standalone.start()
def test_basic_acl(topology_st, import_example_ldif):
"""Run some basic access control (ACL) tests
......@@ -580,7 +599,7 @@ def test_basic_referrals(topology_st, import_example_ldif):
:setup: Standalone instance
:steps:
1. Set the referral and the backenidealyd state
1. Set the referral and the backend state
2. Set backend state to referral mode.
3. Set server to not follow referral.
4. Search using referral.
......@@ -868,6 +887,135 @@ adds nsslapd-return-default-opattr attr with value of one operation attribute.
log.fatal('Search failed, error: ' + e.message['desc'])
assert False
@pytest.fixture(scope="module")
def test_users(topology_st):
"""Add users to the default suffix
"""
users = UserAccounts(topology_st.standalone, DEFAULT_SUFFIX)
user_names = ["Directory", "Server", "389", "lib389", "pytest"]
log.info('Adding 5 test users')
for name in user_names:
user = users.create(properties={
'uid': name,
'sn': name,
'cn': name,
'uidNumber': '1000',
'gidNumber': '1000',
'homeDirectory': '/home/%s' % name,
'mail': '%s@example.com' % name,
'userpassword': 'pass%s' % name,
})
def test_basic_anonymous_search(topology_st, test_users):
"""Tests basic anonymous search operations
:id: c7831e04-f458-4e50-83c7-b6f77109f639
:setup: Standalone instance
Add 5 test users with different user names
:steps:
1. Execute anonymous search with different filters
:expectedresults:
1. Search should be successful
"""
filters = ["uid=Directory", "(|(uid=S*)(uid=3*))", "(&(uid=l*)(mail=l*))", "(&(!(uid=D*))(ou=People))"]
log.info("Execute anonymous search with different filters")
for filtr in filters:
entries = topology_st.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, filtr)
assert len(entries) != 0
@pytest.mark.ds604
@pytest.mark.bz915801
def test_search_original_type(topology_st, test_users):
"""Test ldapsearch returning original attributes
using nsslapd-search-return-original-type-switch
:id: d7831d04-f558-4e50-93c7-b6f77109f640
:setup: Standalone instance
Add some test entries
:steps:
1. Set nsslapd-search-return-original-type-switch to ON
2. Check that ldapsearch *does* return unknown attributes
3. Turn off nsslapd-search-return-original-type-switch
4. Check that ldapsearch doesn't return any unknown attributes
:expectedresults:
1. nsslapd-search-return-original-type-switch should be set to ON
2. ldapsearch should return unknown attributes
3. nsslapd-search-return-original-type-switch should be OFF
4. ldapsearch should not return any unknown attributes
"""
log.info("Set nsslapd-search-return-original-type-switch to ON")
topology_st.standalone.config.set('nsslapd-search-return-original-type-switch', 'on')
log.info("Check that ldapsearch *does* return unknown attributes")
entries = topology_st.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, 'uid=Directory',
['objectclass overflow', 'unknown'])
assert "objectclass overflow" in entries[0].getAttrs()
log.info("Set nsslapd-search-return-original-type-switch to Off")
topology_st.standalone.config.set('nsslapd-search-return-original-type-switch', 'off')
log.info("Check that ldapsearch *does not* return unknown attributes")
entries = topology_st.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, 'uid=Directory',
['objectclass overflow', 'unknown'])
assert "objectclass overflow" not in entries[0].getAttrs()
@pytest.mark.bz192901
def test_search_ou(topology_st):
"""Test that DS should not return an entry that does not match the filter
:id: d7831d05-f117-4e89-93c7-b6f77109f640
:setup: Standalone instance
:steps:
1. Create an OU entry without sub entries
2. Search from the OU with the filter that does not match the OU
:expectedresults:
1. Creation of OU should be successful
2. Search should not return any results
"""
log.info("Create a test OU without sub entries")
ou = OrganizationalUnits(topology_st.standalone, DEFAULT_SUFFIX)
ou.create(properties={
'ou': 'test_ou',
})
search_base = ("ou=test_ou,%s" % DEFAULT_SUFFIX)
log.info("Search from the OU with the filter that does not match the OU, it should not return anything")
entries = topology_st.standalone.search_s(search_base, ldap.SCOPE_SUBTREE, 'uid=*', ['dn'])
assert len(entries) == 0
@pytest.mark.bz1044135
@pytest.mark.ds47319
def test_connection_buffer_size(topology_st):
"""Test connection buffer size adjustable with different values(valid values and invalid)
:id: e7831d05-f117-4ec9-1203-b6f77109f117
:setup: Standalone instance
:steps:
1. Set nsslapd-connection-buffer to some valid values (2, 0 , 1)
2. Set nsslapd-connection-buffer to some invalid values (-1, a)
:expectedresults:
1. This should pass
2. This should fail
"""
valid_values = ['2', '0', '1']
for value in valid_values:
topology_st.standalone.config.replace('nsslapd-connection-buffer', value)
invalid_values = ['-1', 'a']
for value in invalid_values:
with pytest.raises(ldap.OPERATIONS_ERROR):
topology_st.standalone.config.replace('nsslapd-connection-buffer', value)
if __name__ == '__main__':
# Run isolated
# -s for DEBUG mode
......
......@@ -12,23 +12,17 @@ from lib389.tasks import *
from lib389.utils import *
from lib389.topologies import topology_st
from lib389.plugins import SevenBitCheckPlugin, AttributeUniquenessPlugin, MemberOfPlugin
from lib389.idm.user import UserAccounts, TEST_USER_PROPERTIES
from lib389.idm.group import Groups
from lib389._constants import DEFAULT_SUFFIX, PLUGIN_7_BIT_CHECK, PLUGIN_ATTR_UNIQUENESS, PLUGIN_MEMBER_OF
logging.getLogger(__name__).setLevel(logging.DEBUG)
log = logging.getLogger(__name__)
@pytest.fixture(scope='module')
def dynamic_plugins(topology_st):
"""Enable dynamic plugins - makes plugin testing much easier"""
try:
topology_st.standalone.modify_s(DN_CONFIG, [(ldap.MOD_REPLACE, 'nsslapd-dynamic-plugins', 'on')])
except ldap.LDAPError as e:
ldap.error('Failed to enable dynamic plugin!' + e.message['desc'])
assert False
def test_betxt_7bit(topology_st, dynamic_plugins):
def test_betxt_7bit(topology_st):
"""Test that the 7-bit plugin correctly rejects an invalid update
:id: 9e2ab27b-eda9-4cd9-9968-a1a8513210fd
......@@ -51,55 +45,39 @@ def test_betxt_7bit(topology_st, dynamic_plugins):
log.info('Running test_betxt_7bit...')
USER_DN = 'uid=test_entry,' + DEFAULT_SUFFIX
eight_bit_rdn = six.u('uid=Fu\u00c4\u00e8')
BAD_RDN = eight_bit_rdn.encode('utf-8')
BAD_RDN = u'uid=Fu\u00c4\u00e8'
# This plugin should on by default, but just in case...
topology_st.standalone.plugins.enable(name=PLUGIN_7_BIT_CHECK)
sevenbc = SevenBitCheckPlugin(topology_st.standalone)
sevenbc.enable()
topology_st.standalone.restart()
# Add our test user
try:
topology_st.standalone.add_s(Entry((USER_DN, {'objectclass': "top extensibleObject".split(),
'sn': '1',
'cn': 'test 1',
'uid': 'test_entry',
'userpassword': 'password'})))
except ldap.LDAPError as e:
log.error('Failed to add test user' + USER_DN + ': error ' + e.message['desc'])
assert False
users = UserAccounts(topology_st.standalone, basedn=DEFAULT_SUFFIX)
user = users.create(properties=TEST_USER_PROPERTIES)
# Attempt a modrdn, this should fail
try:
topology_st.standalone.rename_s(USER_DN, BAD_RDN, delold=0)
user.rename(BAD_RDN)
log.fatal('test_betxt_7bit: Modrdn operation incorrectly succeeded')
assert False
except ldap.LDAPError as e:
log.info('Modrdn failed as expected: error ' + e.message['desc'])
log.info('Modrdn failed as expected: error %s' % str(e))
# Make sure the operation did not succeed, attempt to search for the new RDN
try:
entries = topology_st.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, BAD_RDN)
if entries:
log.fatal('test_betxt_7bit: Incorrectly found the entry using the invalid RDN')
assert False
except ldap.LDAPError as e:
log.fatal('Error while searching for test entry: ' + e.message['desc'])
assert False
user_check = users.get("testuser")
assert user_check.dn == user.dn
#
# Cleanup - remove the user
#
try:
topology_st.standalone.delete_s(USER_DN)
except ldap.LDAPError as e:
log.fatal('Failed to delete test entry: ' + e.message['desc'])
assert False
user.delete()
log.info('test_betxt_7bit: PASSED')
def test_betxn_attr_uniqueness(topology_st, dynamic_plugins):
def test_betxn_attr_uniqueness(topology_st):
"""Test that we can not add two entries that have the same attr value that is
defined by the plugin
......@@ -124,50 +102,40 @@ def test_betxn_attr_uniqueness(topology_st, dynamic_plugins):
USER1_DN = 'uid=test_entry1,' + DEFAULT_SUFFIX
USER2_DN = 'uid=test_entry2,' + DEFAULT_SUFFIX
topology_st.standalone.plugins.enable(name=PLUGIN_ATTR_UNIQUENESS)
attruniq = AttributeUniquenessPlugin(topology_st.standalone)
attruniq.enable()
topology_st.standalone.restart()
# Add the first entry
try:
topology_st.standalone.add_s(Entry((USER1_DN, {'objectclass': "top extensibleObject".split(),
'sn': '1',
'cn': 'test 1',
'uid': 'test_entry1',
'userpassword': 'password1'})))
except ldap.LDAPError as e:
log.fatal('test_betxn_attr_uniqueness: Failed to add test user: ' +
USER1_DN + ', error ' + e.message['desc'])
assert False
users = UserAccounts(topology_st.standalone, basedn=DEFAULT_SUFFIX)
user1 = users.create(properties={
'uid': 'testuser1',
'cn' : 'testuser1',
'sn' : 'user1',
'uidNumber' : '1001',
'gidNumber' : '2001',
'homeDirectory' : '/home/testuser1'
})
# Add the second entry with a duplicate uid
try:
topology_st.standalone.add_s(Entry((USER2_DN, {'objectclass': "top extensibleObject".split(),
'sn': '2',
'cn': 'test 2',
'uid': 'test_entry2',
'uid': 'test_entry1', # Duplicate value
'userpassword': 'password2'})))
user2 = users.create(properties={
'uid': ['testuser2', 'testuser1'],
'cn' : 'testuser2',
'sn' : 'user2',
'uidNumber' : '1002',
'gidNumber' : '2002',
'homeDirectory' : '/home/testuser2'
})
log.fatal('test_betxn_attr_uniqueness: The second entry was incorrectly added.')
assert False
except ldap.LDAPError as e:
log.error('test_betxn_attr_uniqueness: Failed to add test user as expected: ' +
USER1_DN + ', error ' + e.message['desc'])
log.error('test_betxn_attr_uniqueness: Failed to add test user as expected:')
#
# Cleanup - disable plugin, remove test entry
#
topology_st.standalone.plugins.disable(name=PLUGIN_ATTR_UNIQUENESS)
try:
topology_st.standalone.delete_s(USER1_DN)
except ldap.LDAPError as e:
log.fatal('test_betxn_attr_uniqueness: Failed to delete test entry1: ' +
e.message['desc'])
assert False
user1.delete()
log.info('test_betxn_attr_uniqueness: PASSED')
def test_betxn_memberof(topology_st, dynamic_plugins):
def test_betxn_memberof(topology_st):
"""Test PLUGIN_MEMBER_OF plugin
:id: 70d0b96e-b693-4bf7-bbf5-102a66ac5993
......@@ -192,55 +160,34 @@ def test_betxn_memberof(topology_st, dynamic_plugins):
ENTRY2_DN = 'cn=group2,' + DEFAULT_SUFFIX
PLUGIN_DN = 'cn=' + PLUGIN_MEMBER_OF + ',cn=plugins,cn=config'
# Enable and configure memberOf plugin
topology_st.standalone.plugins.enable(name=PLUGIN_MEMBER_OF)
try:
topology_st.standalone.modify_s(PLUGIN_DN, [(ldap.MOD_REPLACE, 'memberofgroupattr', 'member'),
(ldap.MOD_REPLACE, 'memberofAutoAddOC', 'referral')])
except ldap.LDAPError as e:
log.fatal('test_betxn_memberof: Failed to update config(member): error ' + e.message['desc'])
assert False
memberof = MemberOfPlugin(topology_st.standalone)
memberof.enable()
memberof.set_autoaddoc('referral')
# memberof.add_groupattr('member') # This is already the default.
topology_st.standalone.restart()
# Add our test entries
try:
topology_st.standalone.add_s(Entry((ENTRY1_DN, {'objectclass': "top groupofnames".split(),
'cn': 'group1'})))
except ldap.LDAPError as e:
log.error('test_betxn_memberof: Failed to add group1:' +
ENTRY1_DN + ', error ' + e.message['desc'])
assert False
groups = Groups(topology_st.standalone, DEFAULT_SUFFIX)
group1 = groups.create(properties={
'cn' : 'group1',
})
try:
topology_st.standalone.add_s(Entry((ENTRY2_DN, {'objectclass': "top groupofnames".split(),
'cn': 'group1'})))
except ldap.LDAPError as e:
log.error('test_betxn_memberof: Failed to add group2:' +
ENTRY2_DN + ', error ' + e.message['desc'])
assert False
#
# Test mod replace
#
# Add group2 to group1 - it should fail with objectclass violation
try:
topology_st.standalone.modify_s(ENTRY1_DN, [(ldap.MOD_REPLACE, 'member', ENTRY2_DN)])
log.fatal('test_betxn_memberof: Group2 was incorrectly allowed to be added to group1')
assert False
except ldap.LDAPError as e:
log.info('test_betxn_memberof: Group2 was correctly rejected (mod replace): error ' + e.message['desc'])
group2 = groups.create(properties={
'cn' : 'group2',
})
#
# Test mod add
#
# We may need to mod groups to not have nsMemberOf ... ?
if not ds_is_older('1.3.7'):
group1.remove('objectClass', 'nsMemberOf')
group2.remove('objectClass', 'nsMemberOf')
# Add group2 to group1 - it should fail with objectclass violation
try:
group1.add_member(group2.dn)
topology_st.standalone.modify_s(ENTRY1_DN, [(ldap.MOD_ADD, 'member', ENTRY2_DN)])
log.fatal('test_betxn_memberof: Group2 was incorrectly allowed to be added to group1')
assert False
except ldap.LDAPError as e:
log.info('test_betxn_memberof: Group2 was correctly rejected (mod add): error ' + e.message['desc'])
log.info('test_betxn_memberof: Group2 was correctly rejected (mod add): error')
#
# Done
......
......@@ -6,7 +6,8 @@
# See LICENSE for details.
# --- END COPYRIGHT BLOCK ---
#
import time
import subprocess
import pytest
from lib389.tasks import *
from lib389.utils import *
......@@ -37,7 +38,6 @@ def test_clu_pwdhash(topology_st):
log.info('Running test_clu_pwdhash...')
cmd = '%s -s ssha testpassword' % os.path.join(topology_st.standalone.get_bin_dir(), 'pwdhash')
p = os.popen(cmd)
result = p.readline()
p.close()
......@@ -49,14 +49,45 @@ def test_clu_pwdhash(topology_st):
if len(result) < 20:
log.fatal('test_clu_pwdhash: Encrypted password is too short')
assert False
log.info('pwdhash generated: ' + result)
log.info('test_clu_pwdhash: PASSED')
def test_clu_pwdhash_mod(topology_st):
"""Test the pwdhash script output with -D configdir
:id: 874ab5e2-207b-4a95-b4c0-22d97b8ab643
:setup: Standalone instance
:steps:
1. Set nsslapd-rootpwstoragescheme & passwordStorageScheme to SSHA256 & SSHA384 respectively
2. Execute /usr/bin/pwdhash -D /etc/dirsrv/slapd-instance_name/ <password>
3. Check if there is any output
4. Check if the command returns the hashed string using the algorithm set in nsslapd-rootpwstoragescheme
:expectedresults:
1. nsslapd-rootpwstoragescheme & passwordStorageScheme should set to SSHA256 & SSHA384 respectively
2. Execution should PASS
3. There should be an output from the command
4. Command should return the hashed string using the algorithm set in nsslapd-rootpwstoragescheme
"""
log.info('Running test_clu_pwdhash_mod...')
topology_st.standalone.config.set('nsslapd-rootpwstoragescheme', 'SSHA256')
topology_st.standalone.config.set('passwordStorageScheme', 'SSHA384')
cmd = [os.path.join(topology_st.standalone.get_bin_dir(), 'pwdhash'), '-D', '/etc/dirsrv/slapd-standalone1',
'password']
result = subprocess.check_output(cmd)
stdout = ensure_str(result)
assert result, "Failed to run pwdhash"
assert 'SSHA256' in stdout
log.info('pwdhash generated: ' + stdout)
log.info('returned the hashed string using the algorithm set in nsslapd-rootpwstoragescheme')
if __name__ == '__main__':
# Run isolated
# -s for DEBUG mode
CURRENT_FILE = os.path.realpath(__file__)
pytest.main("-s %s" % CURRENT_FILE)