Skip to content

Isotropic Model

scripts/PyMC_Pytensor_noise_v_main_varsig.py

Samples a single scalar parameter \(q\) where \(w_c = q + 1\). This is the direction-independent (isotropic) Lorentz violation model — the critical velocity is the same for all sources regardless of sky position.

Usage

python scripts/PyMC_Pytensor_noise_v_main_varsig.py

Configuration

Settings are configured by editing variables near the top of the file.

Variable Default Description
dataset "generated_sources.csv" Input CSV. Comment/uncomment to switch to "mojave_cleaned.csv".
n_val_default 20 Integration resolution for likelihood summation.

MCMC settings are passed directly in the pm.sample() call:

Parameter Value
draws 1000
tune 1000
target_accept 0.95

!!! note "PyTensor compiler" The line pytensor.config.cxx = "/usr/bin/clang++" is hardcoded for a specific machine. Comment it out or adjust for your system.

Model Structure

Prior

  • \(q\): TruncatedNormal(sigma=3, lower=-1) — ensures \(w_c = q + 1 > 0\).

Likelihood

Uses the same numerical integration form as the anisotropic model, but with a single scalar \(w_c\) shared across all observations (no per-source direction dependence):

\[ \mathcal{P}_{\mathrm{obs}}(v_t) \approx \sum_{v = v_t - m\sigma_v}^{v_t + m\sigma_v} \frac{1}{\sqrt{2\pi}\,\sigma_v^{3}} \left[ (v - v_t)\exp\!\left(-\frac{(v - v_t)^2}{2\sigma_v^{2}}\right) + (v + v_t)\exp\!\left(-\frac{(v + v_t)^2}{2\sigma_v^{2}}\right) \right] C_v(v)\,\Delta v \]

The CDF function \(C_v(v)\) is evaluated piecewise:

  • If \(v < 0\): \(C_v = 0\)
  • If \(w_c < 1\) (fast-light) and \(w_c^2 + w_t^2 < 1\): \(C_v = 1\)
  • If \(w_c < 1\) (fast-light) and \(w_c^2 + w_t^2 \geq 1\): uses \(\arctan(\sqrt{w_c^2 + w_t^2 - 1})\) expression
  • If \(w_c \geq 1\) (slow-light): uses \(\arctan(w_t / w_c)\) expression

Outputs

Output Description
Trace plot Saved to scripts/trace_iso.png. Shows MCMC chains for \(q\).
Console summary ArviZ summary table with 25th, 50th, and 75th percentiles.

API Reference

PyMC_Pytensor_noise_v_main_varsig.loglike(vt_stack, wc, n_val=n_val_default)

Source code in scripts/PyMC_Pytensor_noise_v_main_varsig.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def loglike(
    vt_stack: pt.TensorVariable,
    wc: pt.TensorVariable,
    n_val=n_val_default,
) -> pt.TensorVariable:
    vt = vt_stack[:, 0]
    sigma = vt_stack[:, 1]
    delta_v = sigma

    sigma_bcast = sigma[:, None]

    v_offsets = pt.linspace(-n_val, n_val, 2 * n_val + 1)

    vt_bcast = vt[:, None]
    delta_v_bcast = delta_v[:, None]
    v_offsets_bcast = v_offsets[None, :]

    v = vt_bcast + v_offsets_bcast * delta_v_bcast
    v_minus_vt = v - vt_bcast
    v_plus_vt = v + vt_bcast

    def CDF_function(v_inner):

        w_inner = pt.switch(pt.eq(v_inner, 0), 0, pt.pow(v_inner, -1))
        wt_squared = pt.pow(w_inner, 2)
        wc_squared = pt.pow(wc, 2)
        sum_squares = wc_squared + wt_squared

        # First expression (used when wc < 1)
        square_root = pt.sqrt(sum_squares - 1)
        at = pt.arctan(square_root)

        numer1 = w_inner * (square_root - at)
        expr1 = 1 - numer1 / sum_squares

        at2 = pt.arctan(w_inner / wc)
        numer2 = w_inner**2 - w_inner * at2
        expr2 = 1 - numer2 / sum_squares

        fastslowcondition = pt.lt(wc, 1)

        result = pt.switch(fastslowcondition, expr1, expr2)
        # result is now:
        # expr1 if wc < 1
        # expr2 if wc > 1

        sumsquarecondition = pt.lt(sum_squares, 1)
        result = pt.switch(sumsquarecondition, 1, result)
        # result is now:
        # 1 if wc < 1 and wt^2 + wc^2 < 1
        # expr1 if wc < 1 and wt^2 + wc^2 > 1
        # expr2 if wc > 1

        negvcondition = pt.lt(v_inner, 0)
        result = pt.switch(negvcondition, 0, result)
        # result is now:
        # 0 if vt < 0
        # 1 if wc < 1 and wt^2 + wc^2 < 1
        # expr1 if wc < 1 and wt^2 + wc^2 > 1
        # expr2 if wc > 1

        return result

    coefficient = (
        v_minus_vt * pt.exp(-(v_minus_vt**2) / (2 * sigma_bcast**2))
        + v_plus_vt * pt.exp(-(v_plus_vt**2) / (2 * sigma_bcast**2))
    ) / (pt.sqrt(2 * pt.pi) * sigma_bcast**3)

    summand = coefficient * CDF_function(v)
    sum_over_v = pt.sum(summand, axis=1)

    P_obs = sum_over_v * delta_v
    return pt.sum(pt.log(pt.clip(P_obs, 1e-12, np.inf)))

PyMC_Pytensor_noise_v_main_varsig.main()

Source code in scripts/PyMC_Pytensor_noise_v_main_varsig.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def main():
    dir_path = os.path.dirname(os.path.realpath(__file__))
    project_root = os.path.abspath(os.path.join(dir_path, os.pardir))

    dataset = "generated_sources.csv"
    # dataset = "mojave_cleaned.csv"

    dataSource = os.path.join(project_root, dataset)

    print(f"Running on PyMC v{pm.__version__}")
    if dataset == "generated_sources.csv":
        filetype_choice = "Simulated"
    elif dataset == "mojave_cleaned.csv":
        filetype_choice = "Mojave"

    dataAll = importCSV(dataSource, filetype=filetype_choice)

    vt_data = [sublist[3] for sublist in dataAll]
    sigmas = [sublist[4] for sublist in dataAll]

    vt_and_sigma = np.stack(
        [vt_data, sigmas],
        axis=1,
    )

    vt_and_sigma_noNaN = vt_and_sigma[~np.isnan(vt_and_sigma).any(axis=1)]
    vt_data_with_sigma = vt_and_sigma_noNaN

    model = pm.Model()

    with model:
        # q = wc - 1, to avoid confusion between the model parameter
        # and the inverse speed of light as a function of the parameters.
        q = pm.TruncatedNormal("q", sigma=3, lower=-1)
        wc = q + 1

        vt_obs = pm.CustomDist("vt_obs", wc, observed=vt_data_with_sigma, logp=loglike)

        trace = pm.sample(1000, tune=1000, target_accept=0.95)

    summary_with_quartiles = az.summary(
        trace,
        stat_funcs={
            "25%": lambda x: np.percentile(x, 25),
            "50%": lambda x: np.percentile(x, 50),
            "75%": lambda x: np.percentile(x, 75),
        },
    )

    axes = az.plot_trace(trace, combined=False)
    plt.savefig(os.path.join(dir_path, "trace_iso.png"), dpi=150, bbox_inches="tight")
    plt.close()

    print(summary_with_quartiles)