import concurrent.futures import string import threading import pytest import numpy as np from numpy._core import _rational_tests from numpy.testing import IS_64BIT, IS_WASM from numpy.testing._private.utils import run_threaded if IS_WASM: pytest.skip(allow_module_level=True, reason="no threading support in wasm") def test_parallel_randomstate_creation(): # if the coercion cache is enabled and not thread-safe, creating # RandomState instances simultaneously leads to a data race def func(seed): np.random.RandomState(seed) run_threaded(func, 500, pass_count=True) def test_parallel_ufunc_execution(): # if the loop data cache or dispatch cache are not thread-safe # computing ufuncs simultaneously in multiple threads leads # to a data race that causes crashes or spurious exceptions def func(): arr = np.random.random((25,)) np.isnan(arr) run_threaded(func, 500) # see gh-26690 NUM_THREADS = 50 a = np.ones(1000) def f(b): b.wait() return a.sum() run_threaded(f, NUM_THREADS, pass_barrier=True) def test_temp_elision_thread_safety(): amid = np.ones(50000) bmid = np.ones(50000) alarge = np.ones(1000000) blarge = np.ones(1000000) def func(count): if count % 4 == 0: (amid * 2) + bmid elif count % 4 == 1: (amid + bmid) - 2 elif count % 4 == 2: (alarge * 2) + blarge else: (alarge + blarge) - 2 run_threaded(func, 100, pass_count=True) def test_eigvalsh_thread_safety(): # if lapack isn't thread safe this will randomly segfault or error # see gh-24512 rng = np.random.RandomState(873699172) matrices = ( rng.random((5, 10, 10, 3, 3)), rng.random((5, 10, 10, 3, 3)), ) run_threaded(lambda i: np.linalg.eigvalsh(matrices[i]), 2, pass_count=True) def test_printoptions_thread_safety(): # until NumPy 2.1 the printoptions state was stored in globals # this verifies that they are now stored in a context variable b = threading.Barrier(2) def legacy_113(): np.set_printoptions(legacy='1.13', precision=12) b.wait() po = np.get_printoptions() assert po['legacy'] == '1.13' assert po['precision'] == 12 orig_linewidth = po['linewidth'] with np.printoptions(linewidth=34, legacy='1.21'): po = np.get_printoptions() assert po['legacy'] == '1.21' assert po['precision'] == 12 assert po['linewidth'] == 34 po = np.get_printoptions() assert po['linewidth'] == orig_linewidth assert po['legacy'] == '1.13' assert po['precision'] == 12 def legacy_125(): np.set_printoptions(legacy='1.25', precision=7) b.wait() po = np.get_printoptions() assert po['legacy'] == '1.25' assert po['precision'] == 7 orig_linewidth = po['linewidth'] with np.printoptions(linewidth=6, legacy='1.13'): po = np.get_printoptions() assert po['legacy'] == '1.13' assert po['precision'] == 7 assert po['linewidth'] == 6 po = np.get_printoptions() assert po['linewidth'] == orig_linewidth assert po['legacy'] == '1.25' assert po['precision'] == 7 task1 = threading.Thread(target=legacy_113) task2 = threading.Thread(target=legacy_125) task1.start() task2.start() def test_parallel_reduction(): # gh-28041 NUM_THREADS = 50 x = np.arange(1000) def closure(b): b.wait() np.sum(x) run_threaded(closure, NUM_THREADS, pass_barrier=True) def test_parallel_flat_iterator(): # gh-28042 x = np.arange(20).reshape(5, 4).T def closure(b): b.wait() for _ in range(100): list(x.flat) run_threaded(closure, outer_iterations=100, pass_barrier=True) # gh-28143 def prepare_args(): return [np.arange(10)] def closure(x, b): b.wait() for _ in range(100): y = np.arange(10) y.flat[x] = x run_threaded(closure, pass_barrier=True, prepare_args=prepare_args) def test_multithreaded_repeat(): x0 = np.arange(10) def closure(b): b.wait() for _ in range(100): x = np.repeat(x0, 2, axis=0)[::2] run_threaded(closure, max_workers=10, pass_barrier=True) def test_structured_advanced_indexing(): # Test that copyswap(n) used by integer array indexing is threadsafe # for structured datatypes, see gh-15387. This test can behave randomly. # Create a deeply nested dtype to make a failure more likely: dt = np.dtype([("", "f8")]) dt = np.dtype([("", dt)] * 2) dt = np.dtype([("", dt)] * 2) # The array should be large enough to likely run into threading issues arr = np.random.uniform(size=(6000, 8)).view(dt)[:, 0] rng = np.random.default_rng() def func(arr): indx = rng.integers(0, len(arr), size=6000, dtype=np.intp) arr[indx] tpe = concurrent.futures.ThreadPoolExecutor(max_workers=8) futures = [tpe.submit(func, arr) for _ in range(10)] for f in futures: f.result() assert arr.dtype is dt def test_structured_threadsafety2(): # Nonzero (and some other functions) should be threadsafe for # structured datatypes, see gh-15387. This test can behave randomly. from concurrent.futures import ThreadPoolExecutor # Create a deeply nested dtype to make a failure more likely: dt = np.dtype([("", "f8")]) dt = np.dtype([("", dt)]) dt = np.dtype([("", dt)] * 2) # The array should be large enough to likely run into threading issues arr = np.random.uniform(size=(5000, 4)).view(dt)[:, 0] def func(arr): arr.nonzero() tpe = ThreadPoolExecutor(max_workers=8) futures = [tpe.submit(func, arr) for _ in range(10)] for f in futures: f.result() assert arr.dtype is dt def test_stringdtype_multithreaded_access_and_mutation( dtype, random_string_list): # this test uses an RNG and may crash or cause deadlocks if there is a # threading bug rng = np.random.default_rng(0x4D3D3D3) chars = list(string.ascii_letters + string.digits) chars = np.array(chars, dtype="U1") ret = rng.choice(chars, size=100 * 10, replace=True) random_string_list = ret.view("U100") def func(arr): rnd = rng.random() # either write to random locations in the array, compute a ufunc, or # re-initialize the array if rnd < 0.25: num = np.random.randint(0, arr.size) arr[num] = arr[num] + "hello" elif rnd < 0.5: if rnd < 0.375: np.add(arr, arr) else: np.add(arr, arr, out=arr) elif rnd < 0.75: if rnd < 0.875: np.multiply(arr, np.int64(2)) else: np.multiply(arr, np.int64(2), out=arr) else: arr[:] = random_string_list with concurrent.futures.ThreadPoolExecutor(max_workers=8) as tpe: arr = np.array(random_string_list, dtype=dtype) futures = [tpe.submit(func, arr) for _ in range(500)] for f in futures: f.result() @pytest.mark.skipif( not IS_64BIT, reason="Sometimes causes failures or crashes due to OOM on 32 bit runners" ) def test_legacy_usertype_cast_init_thread_safety(): def closure(b): b.wait() np.full((10, 10), 1, _rational_tests.rational) run_threaded(closure, 250, pass_barrier=True) @pytest.mark.parametrize("dtype", [bool, int, float]) def test_nonzero(dtype): # See: gh-28361 # # np.nonzero uses np.count_nonzero to determine the size of the output. # array. In a second pass the indices of the non-zero elements are # determined, but they can have changed # # This test triggers a data race which is suppressed in the TSAN CI. # The test is to ensure np.nonzero does not generate a segmentation fault x = np.random.randint(4, size=100).astype(dtype) expected_warning = ('number of non-zero array elements changed' ' during function execution') def func(index): for _ in range(10): if index == 0: x[::2] = np.random.randint(2) else: try: _ = np.nonzero(x) except RuntimeError as ex: assert expected_warning in str(ex) run_threaded(func, max_workers=10, pass_count=True, outer_iterations=5)