248 lines
9.6 KiB
Diff
248 lines
9.6 KiB
Diff
From df599b6b79010759279eb7f52486f1d0a59d06d3 Mon Sep 17 00:00:00 2001
|
|
From: Andrew Bartlett <abartlet@samba.org>
|
|
Date: Mon, 8 Jun 2020 16:32:14 +1200
|
|
Subject: [PATCH 22/22] CVE-2020-10760 dsdb: Add tests for paged_results and
|
|
VLV over the Global Catalog port
|
|
|
|
This should avoid a regression.
|
|
|
|
(backported from master patch)
|
|
[abartlet@samba.org: sort=True parameter on test_paged_delete_during_search
|
|
is not in 4.11]
|
|
|
|
Signed-off-by: Andrew Bartlett <abartlet@samba.org>
|
|
---
|
|
selftest/knownfail.d/vlv | 2 +-
|
|
source4/dsdb/tests/python/vlv.py | 171 +++++++++++++++++++------------
|
|
2 files changed, 107 insertions(+), 66 deletions(-)
|
|
|
|
diff --git a/selftest/knownfail.d/vlv b/selftest/knownfail.d/vlv
|
|
index f187a2ed55e..7ae02baf17b 100644
|
|
--- a/selftest/knownfail.d/vlv
|
|
+++ b/selftest/knownfail.d/vlv
|
|
@@ -1,2 +1,2 @@
|
|
samba4.ldap.vlv.python.*__main__.VLVTests.test_vlv_change_search_expr
|
|
-samba4.ldap.vlv.python.*__main__.PagedResultsTests.test_paged_cant_change_controls_data
|
|
+samba4.ldap.vlv.python.*__main__.PagedResultsTestsRW.test_paged_cant_change_controls_data
|
|
diff --git a/source4/dsdb/tests/python/vlv.py b/source4/dsdb/tests/python/vlv.py
|
|
index f3c603e3a39..ba03b425a5b 100644
|
|
--- a/source4/dsdb/tests/python/vlv.py
|
|
+++ b/source4/dsdb/tests/python/vlv.py
|
|
@@ -152,7 +152,7 @@ class TestsWithUserOU(samba.tests.TestCase):
|
|
super(TestsWithUserOU, self).setUp()
|
|
self.ldb = SamDB(host, credentials=creds,
|
|
session_info=system_session(lp), lp=lp)
|
|
-
|
|
+ self.ldb_ro = self.ldb
|
|
self.base_dn = self.ldb.domain_dn()
|
|
self.tree_dn = "ou=vlvtesttree,%s" % self.base_dn
|
|
self.ou = "ou=vlvou,%s" % self.tree_dn
|
|
@@ -199,8 +199,60 @@ class TestsWithUserOU(samba.tests.TestCase):
|
|
self.ldb.delete(self.tree_dn, ['tree_delete:1'])
|
|
|
|
|
|
-class VLVTests(TestsWithUserOU):
|
|
+class VLVTestsBase(TestsWithUserOU):
|
|
+
|
|
+ # Run a vlv search and return important fields of the response control
|
|
+ def vlv_search(self, attr, expr, cookie="", after_count=0, offset=1):
|
|
+ sort_ctrl = "server_sort:1:0:%s" % attr
|
|
+ ctrl = "vlv:1:0:%d:%d:0" % (after_count, offset)
|
|
+ if cookie:
|
|
+ ctrl += ":" + cookie
|
|
+
|
|
+ res = self.ldb_ro.search(self.ou,
|
|
+ expression=expr,
|
|
+ scope=ldb.SCOPE_ONELEVEL,
|
|
+ attrs=[attr],
|
|
+ controls=[ctrl, sort_ctrl])
|
|
+ results = [str(x[attr][0]) for x in res]
|
|
+
|
|
+ ctrls = [str(c) for c in res.controls if
|
|
+ str(c).startswith('vlv')]
|
|
+ self.assertEqual(len(ctrls), 1)
|
|
+
|
|
+ spl = ctrls[0].rsplit(':')
|
|
+ cookie = ""
|
|
+ if len(spl) == 6:
|
|
+ cookie = spl[-1]
|
|
+
|
|
+ return results, cookie
|
|
+
|
|
+
|
|
+class VLVTestsRO(VLVTestsBase):
|
|
+ def test_vlv_simple_double_run(self):
|
|
+ """Do the simplest possible VLV query to confirm if VLV
|
|
+ works at all. Useful for showing VLV as a whole works
|
|
+ on Global Catalog (for example)"""
|
|
+ attr = 'roomNumber'
|
|
+ expr = "(objectclass=user)"
|
|
|
|
+ # Start new search
|
|
+ full_results, cookie = self.vlv_search(attr, expr,
|
|
+ after_count=len(self.users))
|
|
+
|
|
+ results, cookie = self.vlv_search(attr, expr, cookie=cookie,
|
|
+ after_count=len(self.users))
|
|
+ expected_results = full_results
|
|
+ self.assertEqual(results, expected_results)
|
|
+
|
|
+
|
|
+class VLVTestsGC(VLVTestsRO):
|
|
+ def setUp(self):
|
|
+ super(VLVTestsRO, self).setUp()
|
|
+ self.ldb_ro = SamDB(host + ":3268", credentials=creds,
|
|
+ session_info=system_session(lp), lp=lp)
|
|
+
|
|
+
|
|
+class VLVTests(VLVTestsBase):
|
|
def get_full_list(self, attr, include_cn=False):
|
|
"""Fetch the whole list sorted on the attribute, using the VLV.
|
|
This way you get a VLV cookie."""
|
|
@@ -1081,31 +1133,6 @@ class VLVTests(TestsWithUserOU):
|
|
controls=[sort_control,
|
|
"vlv:0:1:1:1:0:%s" % vlv_cookies[-1]])
|
|
|
|
- # Run a vlv search and return important fields of the response control
|
|
- def vlv_search(self, attr, expr, cookie="", after_count=0, offset=1):
|
|
- sort_ctrl = "server_sort:1:0:%s" % attr
|
|
- ctrl = "vlv:1:0:%d:%d:0" % (after_count, offset)
|
|
- if cookie:
|
|
- ctrl += ":" + cookie
|
|
-
|
|
- res = self.ldb.search(self.ou,
|
|
- expression=expr,
|
|
- scope=ldb.SCOPE_ONELEVEL,
|
|
- attrs=[attr],
|
|
- controls=[ctrl, sort_ctrl])
|
|
- results = [str(x[attr][0]) for x in res]
|
|
-
|
|
- ctrls = [str(c) for c in res.controls if
|
|
- str(c).startswith('vlv')]
|
|
- self.assertEqual(len(ctrls), 1)
|
|
-
|
|
- spl = ctrls[0].rsplit(':')
|
|
- cookie = ""
|
|
- if len(spl) == 6:
|
|
- cookie = spl[-1]
|
|
-
|
|
- return results, cookie
|
|
-
|
|
def test_vlv_modify_during_view(self):
|
|
attr = 'roomNumber'
|
|
expr = "(objectclass=user)"
|
|
@@ -1218,11 +1245,11 @@ class PagedResultsTests(TestsWithUserOU):
|
|
if subtree:
|
|
scope = ldb.SCOPE_SUBTREE
|
|
|
|
- res = self.ldb.search(ou,
|
|
- expression=expr,
|
|
- scope=scope,
|
|
- controls=controls,
|
|
- **kwargs)
|
|
+ res = self.ldb_ro.search(ou,
|
|
+ expression=expr,
|
|
+ scope=scope,
|
|
+ controls=controls,
|
|
+ **kwargs)
|
|
results = [str(r['cn'][0]) for r in res]
|
|
|
|
ctrls = [str(c) for c in res.controls if
|
|
@@ -1235,6 +1262,53 @@ class PagedResultsTests(TestsWithUserOU):
|
|
cookie = spl[-1]
|
|
return results, cookie
|
|
|
|
+
|
|
+class PagedResultsTestsRO(PagedResultsTests):
|
|
+
|
|
+ def test_paged_search_lockstep(self):
|
|
+ expr = "(objectClass=*)"
|
|
+ ps = 3
|
|
+
|
|
+ all_results, _ = self.paged_search(expr, page_size=len(self.users)+1)
|
|
+
|
|
+ # Run two different but overlapping paged searches simultaneously.
|
|
+ set_1_index = int((len(all_results))//3)
|
|
+ set_2_index = int((2*len(all_results))//3)
|
|
+ set_1 = all_results[set_1_index:]
|
|
+ set_2 = all_results[:set_2_index+1]
|
|
+ set_1_expr = "(cn>=%s)" % (all_results[set_1_index])
|
|
+ set_2_expr = "(cn<=%s)" % (all_results[set_2_index])
|
|
+
|
|
+ results, cookie1 = self.paged_search(set_1_expr, page_size=ps)
|
|
+ self.assertEqual(results, set_1[:ps])
|
|
+ results, cookie2 = self.paged_search(set_2_expr, page_size=ps)
|
|
+ self.assertEqual(results, set_2[:ps])
|
|
+
|
|
+ results, cookie1 = self.paged_search(set_1_expr, cookie=cookie1,
|
|
+ page_size=ps)
|
|
+ self.assertEqual(results, set_1[ps:ps*2])
|
|
+ results, cookie2 = self.paged_search(set_2_expr, cookie=cookie2,
|
|
+ page_size=ps)
|
|
+ self.assertEqual(results, set_2[ps:ps*2])
|
|
+
|
|
+ results, _ = self.paged_search(set_1_expr, cookie=cookie1,
|
|
+ page_size=len(self.users))
|
|
+ self.assertEqual(results, set_1[ps*2:])
|
|
+ results, _ = self.paged_search(set_2_expr, cookie=cookie2,
|
|
+ page_size=len(self.users))
|
|
+ self.assertEqual(results, set_2[ps*2:])
|
|
+
|
|
+
|
|
+class PagedResultsTestsGC(PagedResultsTestsRO):
|
|
+
|
|
+ def setUp(self):
|
|
+ super(PagedResultsTestsRO, self).setUp()
|
|
+ self.ldb_ro = SamDB(host + ":3268", credentials=creds,
|
|
+ session_info=system_session(lp), lp=lp)
|
|
+
|
|
+
|
|
+class PagedResultsTestsRW(PagedResultsTests):
|
|
+
|
|
def test_paged_delete_during_search(self):
|
|
expr = "(objectClass=*)"
|
|
|
|
@@ -1611,39 +1685,6 @@ class PagedResultsTests(TestsWithUserOU):
|
|
cookie, attrs=changed_attrs,
|
|
extra_ctrls=[])
|
|
|
|
- def test_paged_search_lockstep(self):
|
|
- expr = "(objectClass=*)"
|
|
- ps = 3
|
|
-
|
|
- all_results, _ = self.paged_search(expr, page_size=len(self.users)+1)
|
|
-
|
|
- # Run two different but overlapping paged searches simultaneously.
|
|
- set_1_index = int((len(all_results))//3)
|
|
- set_2_index = int((2*len(all_results))//3)
|
|
- set_1 = all_results[set_1_index:]
|
|
- set_2 = all_results[:set_2_index+1]
|
|
- set_1_expr = "(cn>=%s)" % (all_results[set_1_index])
|
|
- set_2_expr = "(cn<=%s)" % (all_results[set_2_index])
|
|
-
|
|
- results, cookie1 = self.paged_search(set_1_expr, page_size=ps)
|
|
- self.assertEqual(results, set_1[:ps])
|
|
- results, cookie2 = self.paged_search(set_2_expr, page_size=ps)
|
|
- self.assertEqual(results, set_2[:ps])
|
|
-
|
|
- results, cookie1 = self.paged_search(set_1_expr, cookie=cookie1,
|
|
- page_size=ps)
|
|
- self.assertEqual(results, set_1[ps:ps*2])
|
|
- results, cookie2 = self.paged_search(set_2_expr, cookie=cookie2,
|
|
- page_size=ps)
|
|
- self.assertEqual(results, set_2[ps:ps*2])
|
|
-
|
|
- results, _ = self.paged_search(set_1_expr, cookie=cookie1,
|
|
- page_size=len(self.users))
|
|
- self.assertEqual(results, set_1[ps*2:])
|
|
- results, _ = self.paged_search(set_2_expr, cookie=cookie2,
|
|
- page_size=len(self.users))
|
|
- self.assertEqual(results, set_2[ps*2:])
|
|
-
|
|
def test_vlv_paged(self):
|
|
"""Testing behaviour with VLV and paged_results set.
|
|
|
|
--
|
|
2.17.1
|
|
|