"""Unit tests for dense reward components.""" import math import sqlite3 import pytest from server.reward import ( _bin_progress, _cardinality_score, _layer1_operational, _layer2_progress, _numeric_range_score, _value_overlap_score, compute_step_reward, ) from sql_env.models import EpisodeContext, QuestionRecord def _build_question_record() -> QuestionRecord: return QuestionRecord( question_id="q-episode-context", question_text="How many students are there?", database_name="student_assessment", gold_sql="SELECT COUNT(*) FROM students", gold_answer="0", answer_type="integer", difficulty="easy", tables_involved=["students"], ) def _build_episode_context(**kwargs: object) -> EpisodeContext: return EpisodeContext( episode_id="ep-episode-context", db_connection=sqlite3.connect(":memory:"), question_record=_build_question_record(), **kwargs, ) class TestLayer1Operational: def test_layer1_successful_query(self) -> None: context = _build_episode_context() try: reward = _layer1_operational( context, action_type="QUERY", sql="SELECT 1", rows=[(1,)], error=None, ) assert reward == 0.025 finally: context.db_connection.close() def test_layer1_successful_describe(self) -> None: context = _build_episode_context() try: reward = _layer1_operational( context, action_type="DESCRIBE", sql="DESCRIBE students", rows=[("id", "INTEGER")], error=None, ) assert reward == 0.015 finally: context.db_connection.close() def test_layer1_successful_sample(self) -> None: context = _build_episode_context() try: reward = _layer1_operational( context, action_type="SAMPLE", sql="SELECT * FROM students LIMIT 5", rows=[(1,)], error=None, ) assert reward == 0.015 finally: context.db_connection.close() def test_layer1_error_query(self) -> None: context = _build_episode_context() try: reward = _layer1_operational( context, action_type="QUERY", sql="SELECT missing FROM students", rows=None, error="no such column", ) assert reward == -0.005 finally: context.db_connection.close() def test_layer1_new_info_no_cap(self) -> None: """New info is awarded per unique query with no cumulative cap.""" context = _build_episode_context() try: total = 0.0 for idx in range(15): r = _layer1_operational( context, action_type="QUERY", sql=f"SELECT {idx}", rows=[(idx,)], error=None, ) total += r # 15 unique queries: exec_ok(0.02) + new_info(0.01) - cost(0.005) assert total == pytest.approx(15 * 0.025) finally: context.db_connection.close() def test_layer1_repeat_penalty(self) -> None: context = _build_episode_context() try: _layer1_operational( context, action_type="QUERY", sql="SELECT 1", rows=[(1,)], error=None, ) reward = _layer1_operational( context, action_type="QUERY", sql="SELECT 1", rows=[(1,)], error=None, ) assert reward == -0.015 finally: context.db_connection.close() def test_layer1_repeat_no_exec_ok(self) -> None: context = _build_episode_context() try: _layer1_operational( context, action_type="QUERY", sql="SELECT 2", rows=[(2,)], error=None, ) reward = _layer1_operational( context, action_type="QUERY", sql="SELECT 2", rows=[(2,)], error=None, ) assert reward <= -0.005 assert reward == -0.015 finally: context.db_connection.close() def test_layer1_step_cost_always_applied(self) -> None: context = _build_episode_context() try: reward_success = _layer1_operational( context, action_type="SAMPLE", sql="SELECT * FROM students LIMIT 1", rows=[(1,)], error=None, ) reward_error = _layer1_operational( context, action_type="QUERY", sql="SELECT bad", rows=None, error="bad query", ) assert reward_success < 0.02 assert reward_error == -0.005 finally: context.db_connection.close() class TestCardinalityScore: def test_cardinality_exact_match(self) -> None: assert _cardinality_score([(1,), (2,)], [(3,), (4,)]) == 1.0 def test_cardinality_zero_pred(self) -> None: assert _cardinality_score([], [(1,)]) == 0.0 def test_cardinality_zero_gold(self) -> None: assert _cardinality_score([(1,)], []) == 0.0 def test_cardinality_both_empty(self) -> None: assert _cardinality_score([], []) == 1.0 def test_cardinality_pred_larger(self) -> None: pred_rows = [(idx,) for idx in range(10)] assert _cardinality_score(pred_rows, [(1,)]) == pytest.approx(0.1) def test_cardinality_gold_larger(self) -> None: gold_rows = [(idx,) for idx in range(4)] assert _cardinality_score([(1,)], gold_rows) == 0.25 def test_cardinality_returns_float_in_range(self) -> None: score = _cardinality_score([(1,), (2,)], [(1,)]) assert 0.0 <= score <= 1.0 class TestValueOverlapScore: def test_value_overlap_identical(self) -> None: assert _value_overlap_score([(1, "a")], [(1, "a")]) == 1.0 def test_value_overlap_disjoint(self) -> None: assert _value_overlap_score([(1, "x")], [(2, "y")]) == 0.0 def test_value_overlap_partial(self) -> None: score = _value_overlap_score([(1, "a"), (2, "b")], [(1, "a"), (3, "c")]) assert score == pytest.approx(2 / 6) def test_value_overlap_empty_pred(self) -> None: assert _value_overlap_score([], [(1,)]) == 0.0 def test_value_overlap_empty_gold(self) -> None: assert _value_overlap_score([(1,)], []) == 0.0 def test_value_overlap_both_empty(self) -> None: assert _value_overlap_score([], []) == 0.0 def test_value_overlap_stringifies_values(self) -> None: score = _value_overlap_score([(1, 2.5, None)], [(1, 2.5, None)]) assert score == 1.0 def test_value_overlap_returns_float_in_range(self) -> None: score = _value_overlap_score([(1, "a")], [(1, "b")]) assert 0.0 <= score <= 1.0 class TestNumericRangeScore: def test_numeric_range_identical(self) -> None: assert _numeric_range_score([(10,)], [(10,)]) == 1.0 def test_numeric_range_no_numerics_in_gold(self) -> None: assert _numeric_range_score([("a",)], [("b",)]) == 1.0 def test_numeric_range_close_values(self) -> None: score = _numeric_range_score([(11,)], [(10,)]) assert score > 0.5 assert score < 1.0 def test_numeric_range_far_values(self) -> None: score = _numeric_range_score([(1000000,)], [(1,)]) assert score < 0.1 def test_numeric_range_zero_distance(self) -> None: assert _numeric_range_score([(0,)], [(0,)]) == 1.0 def test_numeric_range_negative_numbers(self) -> None: expected = 1.0 / (1.0 + math.log1p(10.0)) score = _numeric_range_score([(-5,)], [(5,)]) assert score == expected def test_numeric_range_mixed_types(self) -> None: assert _numeric_range_score([(10, "a")], [(10, "b")]) == 1.0 def test_numeric_range_empty_pred(self) -> None: assert _numeric_range_score([], [(1,)]) == 0.0 def test_numeric_range_returns_float_in_range(self) -> None: score = _numeric_range_score([(5,), (10,)], [(7,)]) assert 0.0 <= score <= 1.0 class TestBinProgress: def test_bin_progress_zero(self) -> None: assert _bin_progress(0.0) == 0.0 def test_bin_progress_low(self) -> None: assert _bin_progress(0.124) == 0.0 def test_bin_progress_boundary_0125(self) -> None: assert _bin_progress(0.125) == 0.25 def test_bin_progress_mid_low(self) -> None: assert _bin_progress(0.3) == 0.25 def test_bin_progress_boundary_0375(self) -> None: assert _bin_progress(0.375) == 0.5 def test_bin_progress_mid(self) -> None: assert _bin_progress(0.5) == 0.5 def test_bin_progress_boundary_0625(self) -> None: assert _bin_progress(0.625) == 0.75 def test_bin_progress_mid_high(self) -> None: assert _bin_progress(0.7) == 0.75 def test_bin_progress_boundary_0875(self) -> None: assert _bin_progress(0.875) == 1.0 def test_bin_progress_one(self) -> None: assert _bin_progress(1.0) == 1.0 class TestLayer2Progress: def test_layer2_perfect_match(self) -> None: context = _build_episode_context(gold_rows=[(1, "a", 10)]) try: reward = _layer2_progress(context, rows=[(1, "a", 10)]) assert reward == pytest.approx(0.15) assert context.previous_progress == 1.0 finally: context.db_connection.close() def test_layer2_no_change(self) -> None: context = _build_episode_context(gold_rows=[(1, "a", 10)]) try: _layer2_progress(context, rows=[(1, "a", 10)]) reward = _layer2_progress(context, rows=[(1, "a", 10)]) assert reward == 0.0 assert context.previous_progress == 1.0 finally: context.db_connection.close() def test_layer2_improvement(self) -> None: context = _build_episode_context(gold_rows=[(1,), (2,), (3,), (4,)]) try: first_reward = _layer2_progress(context, rows=[(1,)]) assert first_reward == pytest.approx(0.0375) assert context.previous_progress == 0.25 second_reward = _layer2_progress(context, rows=[(1,), (2,), (3,), (4,)]) assert second_reward == pytest.approx(0.1125) assert context.previous_progress == 1.0 finally: context.db_connection.close() def test_layer2_regression_penalized(self) -> None: """Delta-based: regression yields negative reward.""" context = _build_episode_context(gold_rows=[(1, "a", 10)]) try: _layer2_progress(context, rows=[(1, "a", 10)]) assert context.previous_progress == 1.0 reward = _layer2_progress(context, rows=[]) assert reward < 0.0 assert context.previous_progress == 0.0 finally: context.db_connection.close() def test_layer2_recovery_rewarded(self) -> None: """Delta-based: recovery after regression IS rewarded.""" context = _build_episode_context(gold_rows=[(1, "a", 10)]) try: _layer2_progress(context, rows=[(1, "a", 10)]) # -> 1.0 _layer2_progress(context, rows=[]) # -> 0.0 (regression) reward = _layer2_progress(context, rows=[(1, "a", 10)]) # -> 1.0 (recovery) assert reward == pytest.approx(0.15) assert context.previous_progress == 1.0 finally: context.db_connection.close() def test_layer2_empty_gold_rows(self) -> None: context = _build_episode_context(gold_rows=[]) try: reward = _layer2_progress(context, rows=[(1,)]) assert reward == 0.0 assert context.previous_progress == 0.0 finally: context.db_connection.close() def test_layer2_weighted_average(self) -> None: context = _build_episode_context(gold_rows=[(10,), (20,)]) try: reward = _layer2_progress(context, rows=[(10,), (1000,)]) assert reward == pytest.approx(0.075) assert context.previous_progress == 0.5 finally: context.db_connection.close() def test_layer2_updates_previous_progress(self) -> None: context = _build_episode_context(gold_rows=[(1,), (2,), (3,), (4,)]) try: assert context.previous_progress == 0.0 _layer2_progress(context, rows=[(1,), (2,), (3,), (4,)]) assert context.previous_progress == 1.0 finally: context.db_connection.close() class TestComputeStepReward: def test_compute_reward_query_success(self) -> None: context = _build_episode_context(gold_rows=[(10,), (20,)]) try: reward = compute_step_reward( context, action_type="QUERY", sql="SELECT value FROM t", rows=[(10,), (1000,)], error=None, ) assert reward == pytest.approx(0.1) finally: context.db_connection.close() def test_compute_reward_query_error(self) -> None: context = _build_episode_context(gold_rows=[(1,)]) try: reward = compute_step_reward( context, action_type="QUERY", sql="SELECT missing", rows=None, error="no such column", ) assert reward == -0.005 finally: context.db_connection.close() def test_compute_reward_describe(self) -> None: context = _build_episode_context(gold_rows=[(1,)]) try: reward = compute_step_reward( context, action_type="DESCRIBE", sql="DESCRIBE students", rows=[("id", "INTEGER")], error=None, ) assert reward == 0.015 assert context.previous_progress == 0.0 finally: context.db_connection.close() def test_compute_reward_sample(self) -> None: context = _build_episode_context(gold_rows=[(1,)]) try: reward = compute_step_reward( context, action_type="SAMPLE", sql="SELECT * FROM students LIMIT 1", rows=[(1,)], error=None, ) assert reward == 0.015 assert context.previous_progress == 0.0 finally: context.db_connection.close() def test_compute_reward_per_step_cap(self) -> None: """Per-step clipping caps at 0.15.""" context = _build_episode_context(gold_rows=[(1, "a", 10)]) try: reward = compute_step_reward( context, action_type="QUERY", sql="SELECT 1, 'a', 10", rows=[(1, "a", 10)], error=None, ) assert reward <= 0.15 finally: context.db_connection.close() def test_compute_reward_per_step_floor(self) -> None: """Per-step clipping floors at -0.05.""" context = _build_episode_context(gold_rows=[(1, "a", 10)]) try: # First get to high progress compute_step_reward( context, action_type="QUERY", sql="SELECT 1, 'a', 10", rows=[(1, "a", 10)], error=None, ) # Then regress badly (repeat + regression) reward = compute_step_reward( context, action_type="QUERY", sql="SELECT 1, 'a', 10", rows=[(1, "a", 10)], error=None, ) assert reward >= -0.05 finally: context.db_connection.close() def test_compute_reward_no_cumulative_tracking(self) -> None: """Each step is independent — no cumulative state.""" context = _build_episode_context(gold_rows=[(1,)]) try: assert not hasattr(context, "cumulative_step_reward") finally: context.db_connection.close() def test_compute_reward_layer2_skipped_for_describe(self) -> None: context = _build_episode_context(gold_rows=[(1,), (2,)]) try: compute_step_reward( context, action_type="DESCRIBE", sql="DESCRIBE students", rows=[("id", "INTEGER")], error=None, ) assert context.previous_progress == 0.0 finally: context.db_connection.close() def test_compute_reward_layer2_skipped_when_rows_none(self) -> None: context = _build_episode_context(gold_rows=[(1,), (2,)]) try: compute_step_reward( context, action_type="QUERY", sql="SELECT missing", rows=None, error="no such column", ) assert context.previous_progress == 0.0 finally: context.db_connection.close() def test_compute_reward_layer2_skipped_empty_gold(self) -> None: context = _build_episode_context(gold_rows=[]) try: reward = compute_step_reward( context, action_type="QUERY", sql="SELECT 1", rows=[(1,)], error=None, ) assert reward == 0.025 assert context.previous_progress == 0.0 finally: context.db_connection.close()