Commit f4bb1bea authored by Tim Burke's avatar Tim Burke

reconciler: Enqueue right work for shard containers

This fixes newly-enqueued work going forward, but doesn't offer anything
to try to parse existing bad work items or even to kick shards so they
reset their reconciler high-water marks.

Change-Id: I79d20209cea70a6447c4e94941e5e854889cbec5
Closes-Bug: 1836082
parent e62f07d9
......@@ -159,8 +159,8 @@ def get_reconciler_content_type(op):
def get_row_to_q_entry_translator(broker):
account = broker.account
container = broker.container
account = broker.root_account
container = broker.root_container
op_type = {
0: get_reconciler_content_type('put'),
1: get_reconciler_content_type('delete'),
......@@ -924,6 +924,91 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
# our sync pointer
self.assertEqual(broker.get_reconciler_sync(), 2)
def test_misplaced_rows_replicate_and_enqueue_from_shard(self):
# force all timestamps to fall in same hour
ts = (Timestamp(t) for t in
itertools.count(int(time.time()) // 3600 * 3600))
policy = random.choice(list(POLICIES))
broker = self._get_broker('.shards_a', 'some-other-c', node_index=0)
broker.initialize(next(ts).internal, policy.idx)
broker.set_sharding_sysmeta('Root', 'a/c')
remote_policy = random.choice([p for p in POLICIES if p is not
remote_broker = self._get_broker(
'.shards_a', 'some-other-c', node_index=1)
remote_broker.initialize(next(ts).internal, remote_policy.idx)
# add a misplaced row to *local* broker
obj_put_timestamp = next(ts).internal
'o', obj_put_timestamp, 0, 'content-type',
'etag', storage_policy_index=remote_policy.idx)
misplaced = broker.get_misplaced_since(-1, 10)
self.assertEqual(len(misplaced), 1)
# since this row is misplaced it doesn't show up in count
self.assertEqual(broker.get_info()['object_count'], 0)
# add another misplaced row to *local* broker with composite timestamp
ts_data = next(ts)
ts_ctype = next(ts)
ts_meta = next(ts)
'o2', ts_data.internal, 0, 'content-type',
'etag', storage_policy_index=remote_policy.idx,
ctype_timestamp=ts_ctype.internal, meta_timestamp=ts_meta.internal)
misplaced = broker.get_misplaced_since(-1, 10)
self.assertEqual(len(misplaced), 2)
# since this row is misplaced it doesn't show up in count
self.assertEqual(broker.get_info()['object_count'], 0)
# replicate
part, node = self._get_broker_part_node(broker)
daemon = self._run_once(node)
# push to remote, and third node was missing (also maybe reconciler)
self.assertTrue(2 < daemon.stats['rsync'] <= 3, daemon.stats['rsync'])
# grab the rsynced instance of remote_broker
remote_broker = self._get_broker(
'.shards_a', 'some-other-c', node_index=1)
# remote has misplaced rows too now
misplaced = remote_broker.get_misplaced_since(-1, 10)
self.assertEqual(len(misplaced), 2)
# and the correct policy_index and object_count
info = remote_broker.get_info()
expectations = {
'object_count': 0,
'storage_policy_index': policy.idx,
for key, value in expectations.items():
self.assertEqual(info[key], value)
# and we should have also enqueued these rows in a single reconciler,
# since we forced the object timestamps to be in the same hour.
reconciler = daemon.get_reconciler_broker(misplaced[0]['created_at'])
# but it may not be on the same node as us anymore though...
reconciler = self._get_broker(reconciler.account,
reconciler.container, node_index=0)
self.assertEqual(reconciler.get_info()['object_count'], 2)
objects = reconciler.list_objects_iter(
10, '', None, None, None, None, storage_policy_index=0)
self.assertEqual(len(objects), 2)
# NB: reconciler work is for the *root* container!
expected = ('%s:/a/c/o' % remote_policy.idx, obj_put_timestamp, 0,
'application/x-put', obj_put_timestamp)
self.assertEqual(objects[0], expected)
# the second object's listing has ts_meta as its last modified time
# but its full composite timestamp is in the hash field.
expected = ('%s:/a/c/o2' % remote_policy.idx, ts_meta.internal, 0,
encode_timestamps(ts_data, ts_ctype, ts_meta))
self.assertEqual(objects[1], expected)
# having safely enqueued to the reconciler we can advance
# our sync pointer
self.assertEqual(broker.get_reconciler_sync(), 2)
def test_multiple_out_sync_reconciler_enqueue_normalize(self):
ts = (Timestamp(t).internal for t in
