# Labwork 4
This labwork concerns the lecture 4.
The final objective is to write the **radix sort**...

Let us recall that this sort relies onto the partition pattern, that itself relies onto exclusive scan and inclusive reverse scan...

## Exercise 1
Here we start with the simple question: write the exclusive scan (so labwork 3 with some basic modification)...

In [None]:
from __future__ import annotations

from typing import Any, Callable

import numpy as np
from numba import cuda, core
from numba.cuda.cudadrv.devicearray import DeviceNDArray
from numba.cuda.cudadrv.driver import Stream
from numba.np.numpy_support import from_dtype


def display(array, all_values=False):
 def aprint(data_range, noend=False):
 for i in data_range:
 print(f"{array[i]} ", end="")
 if (i % 16) == 15: print("")
 if not noend: print("...")

 if all_values:
 aprint(range(array.size), noend=True)
 return
 aprint(range(0, min(array.size, 16)))
 if array.size < 256 - 16: return
 aprint(range(256 - 16, min(array.size, 256 + 16)))
 if array.size < 512 - 16: return
 aprint(range(512 - 16, min(array.size, 512 + 16)))
 if array.size < 768 - 16: return
 aprint(range(768 - 16, min(array.size, 768 + 16)))
 aprint(range(array.size - 16 - 256, array.size - 256 + 16))
 aprint(range(array.size - 16, array.size), noend=True)


class ExclusiveScan(object):
 """Create a scan object that scans values using a given binary
 function. The binary function is compiled once and cached inside this
 object. Keeping this object alive will prevent re-compilation.
 """

 _kernels_block_scan = {}
 _kernels_block_map = {}

 _WARP_SIZE = 32
 _NUM_WARPS = 8

 @staticmethod
 def _gpu_kernel_block_scan_factory(fn, np_type):
 """Factory of kernels for the block-scan problem...

 This function returns a Cuda Kernel that does block-scan of some data using a given binary functor."""

 scan_op = cuda.jit(func_or_sig=fn, device=True, cache=True)

 max_block_size = ExclusiveScan._NUM_WARPS * ExclusiveScan._WARP_SIZE

 @cuda.jit(device=True, cache=True)
 def load_shared_memory(shared_memory, d_input):
 local_tid = cuda.threadIdx.x
 tid = cuda.grid(1)
 # TODO: load data into shared memory

 @cuda.jit(device=True, cache=True)
 def pointer_jumping(shared_memory, jump):
 tid = cuda.threadIdx.x
 # TODO: implement the pointer jumping

 @cuda.jit(device=True, cache=True)
 def save_shared_memory(shared_memory, d_output, d_extras, null_value):
 local_tid = cuda.threadIdx.x
 global_tid = cuda.grid(1)
 # TODO: save data from shared memory

 def gpu_scan_block(d_input, d_output, d_extras, null_value):
 """
 Per block SCAN...
 """
 # move data to shared memory
 shared_memory = cuda.shared.array(shape=max_block_size, dtype=np_type)
 load_shared_memory(shared_memory, d_input)

 # implements the logics
 jump = 1
 while jump < cuda.blockDim.x:
 pointer_jumping(shared_memory, jump)
 jump *= 2

 # now stores the result
 save_shared_memory(shared_memory, d_output, d_extras, null_value)

 return cuda.jit(func_or_sig=gpu_scan_block, cache=True)

 def __init__(self, functor):
 """
 :param functor: A function implementing a binary operation for
 scan. It will be compiled as a CUDA device
 function using ``cuda.jit(device=True)``.
 """
 self._functor = functor

 def _compile_block_scan(self, dtype):
 key = self._functor, dtype
 if key not in self._kernels_block_scan:
 self._kernels_block_scan[key] = ExclusiveScan._gpu_kernel_block_scan_factory(self._functor,
 from_dtype(dtype))
 return self._kernels_block_scan[key]

 @staticmethod
 def _gpu_kernel_block_map_factory(fn, np_type):
 """Factory of kernels for the block-map problem...

 This function returns a Cuda Kernel that does block-map of some data using a given binary functor."""

 scan_op = cuda.jit(func_or_sig=fn, device=True, cache=True)

 def gpu_map_block(d_io, d_scan):
 """
 Per block MAP...
 """
 # TODO: implements the logics
 tid = cuda.grid(1)
 bid = cuda.blockIdx.x
 if bid == 0:
 # skip the first block (no map needed)
 return
 extra = d_scan[bid]
 d_io[tid] = scan_op(extra, d_io[tid])

 return cuda.jit(func_or_sig=gpu_map_block, cache=True)

 def _compile_block_map(self, dtype):
 key = self._functor, dtype
 if key not in self._kernels_block_map:
 self._kernels_block_map[key] = ExclusiveScan._gpu_kernel_block_map_factory(self._functor, from_dtype(dtype))
 return self._kernels_block_map[key]

 def __call__(self, d_input, d_output, null_value, stream=cuda.default_stream()):
 """ Performs a per-block SCAN.

 :param d_input: A device array with input data.
 :param d_output: A device array to store the result.
 :param null_value: The null value for the
 :param stream: Optional CUDA stream in which to perform the scan.
 If no stream is specified, the default stream of 0 is used.
 """

 # ensure 1d array
 if d_input.ndim != 1:
 raise TypeError("only support 1D array")

 # ensure size > 0
 if d_input.size < 1:
 raise ValueError("array's length is 0")

 # ensure arrays' size are the same
 if d_input.size != d_output.size:
 raise ValueError("arrays' length are different ({d_input.size} / {d_output.size}")

 _sav, core.config.CUDA_LOW_OCCUPANCY_WARNINGS = core.config.CUDA_LOW_OCCUPANCY_WARNINGS, False

 kernel_step1 = self._compile_block_scan(d_input.dtype)
 kernel_step2 = self._compile_block_map(d_input.dtype)

 max_block_size = ExclusiveScan._WARP_SIZE * ExclusiveScan._NUM_WARPS
 nb_threads = min(max_block_size, d_input.size)
 nb_blocks = (d_input.size + nb_threads - 1) // nb_threads
 extras = cuda.device_array(shape=nb_blocks, dtype=d_input.dtype)

 # Perform the reduction on the GPU
 start_event = cuda.event(True)
 start_event.record(stream=stream)

 kernel_step1[nb_blocks, nb_threads, stream](d_input, d_output, extras, null_value)
 # print(f"launch kernel_step1[{nb_blocks}, {nb_threads}]")
 if nb_blocks > 1:
 # display(extras, all=True)
 self(extras, extras, null_value, stream)
 kernel_step2[nb_blocks, nb_threads, stream](d_output, extras)

 stop_event = cuda.event(True)
 stop_event.record(stream=stream)
 stop_event.synchronize()

 core.config.CUDA_LOW_OCCUPANCY_WARNINGS = _sav

 # display(extras)

 return cuda.event_elapsed_time(start_event, stop_event)


if __name__ == "__main__":

 def check(expected, result):
 it_works = np.array_equal(expected, result)
 if it_works:
 print(f'- seems to work')
 else:
 print(f'- seems to not work, here is your result:')
 display(result)
 display(expected)


 def test_int32(size):
 scanner = ExclusiveScan(lambda a, b: a + b)
 h_array = np.ones(size, dtype=np.int32)
 d_result = cuda.device_array(size, dtype=np.int32)
 scanner(cuda.to_device(h_array), d_result, np.int32(0)) # turn on gpu
 ct = scanner(cuda.to_device(h_array), d_result, np.int32(0))
 print(f"scan computation time is {ct} ms")
 expected = np.concatenate((np.zeros(1, dtype=np.int32), np.cumsum(h_array[1:])))
 check(expected, d_result.copy_to_host())


 def test_float32(size):
 scanner = ExclusiveScan(lambda a, b: a + b)
 h_array = np.ones(size, dtype=np.float32)
 d_result = cuda.device_array(size, dtype=np.float32)
 scanner(cuda.to_device(h_array), d_result, np.float32(0.0)) # turn on gpu
 ct = scanner(cuda.to_device(h_array), d_result, np.float32(0.0))
 print(f"scan computation time is {ct} ms")
 expected = np.concatenate((np.zeros(1, dtype=np.float32), np.cumsum(h_array[1:])))
 check(expected, d_result.copy_to_host())


 def test_float64(size):
 scanner = ExclusiveScan(lambda a, b: a + b)
 h_array = np.ones(size, dtype=np.float64)
 d_result = cuda.device_array(size, dtype=np.float64)
 scanner(cuda.to_device(h_array), d_result, np.float64(0.0)) # turn on gpu
 ct = scanner(cuda.to_device(h_array), d_result, np.float64(0.0))
 print(f"scan computation time is {ct} ms")
 expected = np.concatenate((np.zeros(1, dtype=np.float64), np.cumsum(h_array[1:])))
 check(expected, d_result.copy_to_host())


 test_int32(1 << 24)
 # with float32, the sequential cumsum fails with more than 2^24 number...
 test_float32(1 << 24)
 test_float64(1 << 24)


## Exercise 2
Here we continue with the Inclusive scan in the reversed version (so, labwork 3 with some minor modifications again...).

In [None]:
from __future__ import annotations

import numpy as np
from numba.np.numpy_support import from_dtype
from numba import cuda, core


def display(array, all_values=False):
 def aprint(data_range, noend=False):
 for i in data_range:
 print(f"{array[i]} ", end="")
 if (i % 16) == 15: print("")
 if not noend: print("...")

 if all_values:
 aprint(range(array.size), noend=True)
 return
 aprint(range(0, min(array.size, 16)))
 if array.size < 256 - 16: return
 aprint(range(256 - 16, min(array.size, 256 + 16)))
 if array.size < 512 - 16: return
 aprint(range(512 - 16, min(array.size, 512 + 16)))
 if array.size < 768 - 16: return
 aprint(range(768 - 16, min(array.size, 768 + 16)))
 aprint(range(array.size - 16 - 256, array.size - 256 + 16))
 aprint(range(array.size - 16, array.size), noend=True)


class ReverseIScan(object):
 """Create a scan object that scans values using a given binary
 function. The binary function is compiled once and cached inside this
 object. Keeping this object alive will prevent re-compilation.
 """

 _kernels_block_scan = {}
 _kernels_block_map = {}

 _WARP_SIZE = 32
 _NUM_WARPS = 8

 @staticmethod
 def _gpu_kernel_block_scan_factory(fn, np_type):
 """Factory of kernels for the block-scan problem...

 This function returns a Cuda Kernel that does block-scan of some data using a given binary functor."""

 scan_op = cuda.jit(func_or_sig=fn, device=True, cache=True)

 max_block_size = ReverseIScan._NUM_WARPS * ReverseIScan._WARP_SIZE

 @cuda.jit(device=True, cache=True)
 def load_shared_memory(shared_memory, d_input):
 local_tid = cuda.threadIdx.x
 tid = cuda.grid(1)
 # TODO: load data into shared memory

 @cuda.jit(device=True, cache=True)
 def pointer_jumping(shared_memory, jump):
 tid = cuda.threadIdx.x
 # TODO: implement the pointer jumping

 @cuda.jit(device=True, cache=True)
 def save_shared_memory(shared_memory, d_output, d_extras):
 local_tid = cuda.threadIdx.x
 global_tid = cuda.grid(1)
 # TODO: save data from shared memory

 def gpu_scan_block(d_input, d_output, d_extras):
 """
 Per block SCAN...
 """
 # move data to shared memory
 shared_memory = cuda.shared.array(shape=max_block_size, dtype=np_type)
 load_shared_memory(shared_memory, d_input)

 # implements the logics
 jump = 1
 while jump < cuda.blockDim.x:
 pointer_jumping(shared_memory, jump)
 jump *= 2

 # now stores the result
 save_shared_memory(shared_memory, d_output, d_extras)

 return cuda.jit(func_or_sig=gpu_scan_block, cache=True)

 def __init__(self, functor):
 """
 :param functor: A function implementing a binary operation for
 scan. It will be compiled as a CUDA device
 function using ``cuda.jit(device=True)``.
 """
 self._functor = functor

 def _compile_block_scan(self, dtype):
 key = self._functor, dtype
 if key not in self._kernels_block_scan:
 self._kernels_block_scan[key] = ReverseIScan._gpu_kernel_block_scan_factory(self._functor,
 from_dtype(dtype))
 return self._kernels_block_scan[key]

 @staticmethod
 def _gpu_kernel_block_map_factory(fn, np_type):
 """Factory of kernels for the block-map problem...

 This function returns a Cuda Kernel that does block-map of some data using a given binary functor."""

 scan_op = cuda.jit(func_or_sig=fn, device=True, cache=True)

 def gpu_map_block(d_io, d_scan):
 """
 Per block MAP...
 """
 # TODO: implements the logics
 tid = cuda.grid(1)
 bid = cuda.blockIdx.x
 if bid + 1 == cuda.gridDim.x:
 # skip the last block (no map needed)
 return
 extra = d_scan[bid + 1]
 d_io[tid] = scan_op(extra, d_io[tid])

 return cuda.jit(func_or_sig=gpu_map_block, cache=True)

 def _compile_block_map(self, dtype):
 key = self._functor, dtype
 if key not in self._kernels_block_map:
 self._kernels_block_map[key] = ReverseIScan._gpu_kernel_block_map_factory(self._functor, from_dtype(dtype))
 return self._kernels_block_map[key]

 def __call__(self, d_input, d_output, stream=cuda.default_stream()):
 """ Performs a per-block SCAN.

 :param d_input: A device array with input data.
 :param d_output: A device array to store the result.
 :param stream: Optional CUDA stream in which to perform the scan.
 If no stream is specified, the default stream of 0 is used.
 """

 # ensure 1d array
 if d_input.ndim != 1:
 raise TypeError("only support 1D array")

 # ensure size > 0
 if d_input.size < 1:
 raise ValueError("array's length is 0")

 # ensure arrays' size are the same
 if d_input.size != d_output.size:
 raise ValueError("arrays' length are different ({d_input.size} / {d_output.size}")

 _sav, core.config.CUDA_LOW_OCCUPANCY_WARNINGS = core.config.CUDA_LOW_OCCUPANCY_WARNINGS, False

 kernel_step1 = self._compile_block_scan(d_input.dtype)
 kernel_step2 = self._compile_block_map(d_input.dtype)

 max_block_size = ReverseIScan._WARP_SIZE * ReverseIScan._NUM_WARPS
 nb_threads = min(max_block_size, d_input.size)
 nb_blocks = (d_input.size + nb_threads - 1) // nb_threads
 extras = cuda.device_array(shape=nb_blocks, dtype=d_input.dtype)

 # Perform the reduction on the GPU
 start_event = cuda.event(True)
 start_event.record(stream=stream)

 kernel_step1[nb_blocks, nb_threads, stream](d_input, d_output, extras)
 # print(f"launch kernel_step1[{nb_blocks}, {nb_threads}]")
 if nb_blocks > 1:
 # display(extras, all=True)
 self(extras, extras, stream)
 kernel_step2[nb_blocks, nb_threads, stream](d_output, extras)

 stop_event = cuda.event(True)
 stop_event.record(stream=stream)
 stop_event.synchronize()

 core.config.CUDA_LOW_OCCUPANCY_WARNINGS = _sav

 # display(extras)

 return cuda.event_elapsed_time(start_event, stop_event)


if __name__ == "__main__":

 def check(expected, result):
 it_works = np.array_equal(expected, result)
 if it_works:
 print(f'- seems to work')
 else:
 print(f'- seems to not work, here is your result:')
 display(result)
 display(expected)


 def test_int32(size):
 scanner = ReverseIScan(lambda a, b: a + b)
 h_array = np.ones(size, dtype=np.int32)
 d_result = cuda.device_array(size, dtype=np.int32)
 scanner(cuda.to_device(h_array), d_result, np.int32(0)) # turn on gpu
 ct = scanner(cuda.to_device(h_array), d_result, np.int32(0))
 print(f"scan computation time is {ct} ms")
 expected = np.flip(np.cumsum(h_array))
 check(expected, d_result.copy_to_host())


 def test_float32(size):
 scanner = ReverseIScan(lambda a, b: a + b)
 h_array = np.ones(size, dtype=np.float32)
 d_result = cuda.device_array(size, dtype=np.float32)
 scanner(cuda.to_device(h_array), d_result, np.float32(0.0)) # turn on gpu
 ct = scanner(cuda.to_device(h_array), d_result, np.float32(0.0))
 print(f"scan computation time is {ct} ms")
 expected = np.flip(np.cumsum(h_array))
 check(expected, d_result.copy_to_host())


 def test_float64(size):
 scanner = ReverseIScan(lambda a, b: a + b)
 h_array = np.ones(size, dtype=np.float64)
 d_result = cuda.device_array(size, dtype=np.float64)
 scanner(cuda.to_device(h_array), d_result, np.float64(0.0)) # turn on gpu
 ct = scanner(cuda.to_device(h_array), d_result, np.float64(0.0))
 print(f"scan computation time is {ct} ms")
 expected = np.flip(np.cumsum(h_array))
 check(expected, d_result.copy_to_host())


 test_int32(1 << 24)
 # with float32, the sequential cumsum fails with more than 2^24 number...
 test_float32(1 << 24)
 test_float64(1 << 24)


## Exercise 3
The Partition pattern needs also a Unary Map pattern to generate the array of predicates... 
Fill the following classes to implement the kernel logics of unary and binary map. 

In [None]:
class UnaryMap:
 _kernels_cache = {}

 def __init__(self: UnaryMap, functor) -> None:
 self.__functor = functor

 @staticmethod
 def _gpu_kernel_factory(fn, np_type):
 """Factory of kernels for the unary transform problem...

 This function returns a Cuda Kernel that does the unary map of some data using a given unary functor."""

 umap_op = cuda.jit(func_or_sig=fn, device=True, cache=True)

 def kernel(d_input, d_output) -> None:
 tid = cuda.grid(1)
 # TODO

 return cuda.jit(func_or_sig=kernel, cache=True)

 @staticmethod
 def _compile(functor, dtype):

 key = functor, dtype
 if key not in UnaryMap._kernels_cache:
 UnaryMap._kernels_cache[key] = UnaryMap._gpu_kernel_factory(functor, from_dtype(dtype))

 return UnaryMap._kernels_cache[key]

 def __call__(self, arr, res=None, stream=0):
 """Performs a full unary transform.

 :param arr: A host or device array.
 :param res: Optional device array into which to write the transform
 result to. The result is written into the first element of
 this array. If this parameter is specified, then no
 communication of the map output takes place from the
 device to the host.
 :param stream: Optional CUDA stream in which to perform the transform.
 If no stream is specified, the default stream of 0 is used.
 :return: If ``res`` is specified, ``None`` is returned. Otherwise, the
 result of the reduction is returned.
 """

 # ensure 1d array
 if arr.ndim != 1:
 raise TypeError("only support 1D array")

 # ensure size > 0
 if arr.size < 1:
 raise ValueError("array's length is 0")

 kernel = self._compile(self.__functor, arr.dtype)

 # Perform the MAP on the GPU
 nb_threads = 512
 nb_blocks = (arr.size + nb_threads - 1) // nb_threads

 dest = res
 if dest is None:
 dest = cuda.device_array_like(arr)

 elif dest.size != arr.size:
 raise ValueError("destination array's length does not equal input one")

 kernel[nb_blocks, nb_threads, stream](arr, dest)

 if res is not None:
 return None

 return dest


class BinaryMap:
 _kernels_cache = {}

 def __init__(self: BinaryMap, functor) -> None:
 self.__functor = functor

 @staticmethod
 def _gpu_kernel_factory(fn, np_type):
 """Factory of kernels for the binary transform problem...

 This function returns a Cuda Kernel that does the binary map of some data using a given binary functor."""

 map_op = cuda.jit(func_or_sig=fn, device=True, cache=True)

 def kernel(d_input_left, d_input_right, d_output) -> None:
 tid = cuda.grid(1)
 # TODO

 return cuda.jit(func_or_sig=kernel, cache=True)

 @staticmethod
 def _compile(functor, dtype):

 key = functor, dtype
 if key not in BinaryMap._kernels_cache:
 BinaryMap._kernels_cache[key] = BinaryMap._gpu_kernel_factory(functor, from_dtype(dtype))

 return BinaryMap._kernels_cache[key]

 def __call__(self, left, right, res=None, stream=0):
 """Performs a full binary transform.

 :param left: A host or device array, used as the left operand.
 :param right: A host or device array, used as the right operand.
 :param res: Optional device array into which to write the transform
 result to. The result is written into the first element of
 this array. If this parameter is specified, then no
 communication of the map output takes place from the
 device to the host.
 :param stream: Optional CUDA stream in which to perform the transform.
 If no stream is specified, the default stream of 0 is used.
 :return: If ``res`` is specified, ``None`` is returned. Otherwise, the
 result of the reduction is returned.
 """

 # ensure 1d array
 if left.ndim != 1:
 raise TypeError("only support 1D array")

 # ensure size > 0
 if left.size < 1:
 raise ValueError("array's length is 0")

 # ensure same size
 if left.size != right.size:
 raise ValueError("input arrays' length are not equal")

 dest = res
 if dest is None:
 dest = cuda.device_array_like(left)

 elif dest.size != left.size:
 raise ValueError("destination array's length does not equal input ones")

 kernel = self._compile(self.__functor, left.dtype)

 # Perform the MAP on the GPU
 nb_threads = 512
 nb_blocks = (left.size + nb_threads - 1) // nb_threads

 kernel[nb_blocks, nb_threads, stream](left, right, dest)

 if res is not None:
 return None

 return dest

if __name__ == "__main__":
 def check(expected, result):
 if np.array_equal(expected, result):
 print("it seems to work!")
 else:
 print("it doesn't work!")

 def test_x2(size):
 rng = np.random.default_rng()
 h_array = rng.integers(low=0, high=1<<20, size=size, dtype=np.int32)
 d_array = cuda.to_device(h_array)
 d_result = cuda.device_array_like(d_array)
 umap = UnaryMap(lambda x: 2 * x)
 umap(d_array, d_result)
 check(h_array * 2, d_result.copy_to_host())

 def test_diff(size):
 rng = np.random.default_rng()
 h_left = rng.integers(low=0, high=1 << 20, size=size, dtype=np.int32)
 h_right = rng.integers(low=0, high=1 << 20, size=size, dtype=np.int32)
 d_left = cuda.to_device(h_left)
 d_right = cuda.to_device(h_right)
 d_result = cuda.device_array_like(d_left)
 bmap = BinaryMap(lambda l, r: l - r)
 bmap(d_left, d_right, d_result)
 check(h_left - h_right, d_result.copy_to_host())

 test_x2(1<<24)
 test_diff(1<<24)

## Exercise 4

The partition ends with a permutation (the scatter). 
Fill the following class for this purpose... And since we are speaking about permutation, do also the gather for the same price :-)

Notice that the mapping is given through a functor, that can internaly use an array for instance or some private data...

In [None]:
class Scatter:
 _kernels_cache = {}

 def __init__(self: Scatter, functor: Callable) -> None:
 self.__functor = functor

 @staticmethod
 def _gpu_kernel_factory(fn: Callable, np_type: Any) -> cuda.FakeCUDAKernel:
 """Factory of kernels for the scatter permutation...

 This function returns a Cuda Kernel that does the permutation of some data."""

 map_op = cuda.jit(func_or_sig=fn, device=True, cache=True)

 def kernel(d_input, d_output) -> None:
 tid = cuda.grid(1)
 # TODO

 return cuda.jit(func_or_sig=kernel, cache=True)

 @staticmethod
 def _compile(functor: Callable, dtype: Any) -> cuda.FakeCUDAKernel:

 key = functor, dtype
 if key not in Scatter._kernels_cache:
 Scatter._kernels_cache[key] = Scatter._gpu_kernel_factory(functor, from_dtype(dtype))

 return Scatter._kernels_cache[key]

 def __call__(self, src: DeviceNDArray, dest: DeviceNDArray, stream: int = 0) -> None:
 """Performs a full scatter.

 :param src: A host or device array.
 :param dst: Device array into which to write the permuted data to.
 :param stream: Optional CUDA stream in which to perform the transform.
 If no stream is specified, the default stream of 0 is used.
 """

 # ensure device array
 cuda.cudadrv.devicearray.require_cuda_ndarray(src)
 cuda.cudadrv.devicearray.require_cuda_ndarray(dest)

 # ensure 1d array
 if src.ndim != 1:
 raise TypeError("only support 1D array")

 # ensure size > 0
 if src.size < 1:
 raise ValueError("array's length is 0")

 # ensure same shape/size
 if (src.shape != dest.shape) or (src.size != dest.size):
 raise ValueError("source and destination arrays' length are different")

 kernel = self._compile(self.__functor, src.dtype)

 # Perform the SCATTER on the GPU
 nb_threads = 512
 nb_blocks = (src.size + nb_threads - 1) // nb_threads
 kernel[nb_blocks, nb_threads, stream](src, dest)


class Gather:
 _kernels_cache = {}

 def __init__(self: Gather, functor: Callable) -> None:
 self.__functor = functor

 @staticmethod
 def _gpu_kernel_factory(fn: Callable, np_type: Any) -> cuda.FakeCUDAKernel:
 """Factory of kernels for the gather permutation...

 This function returns a Cuda Kernel that does the permutation of some data."""

 map_op = cuda.jit(device=True)(fn)

 def kernel(d_input, d_output) -> None:
 tid = cuda.grid(1)
 # TODO

 return cuda.jit(func_or_sig=kernel, cache=True)

 @staticmethod
 def _compile(functor: Callable, dtype: Any) -> cuda.FakeCUDAKernel:

 key = functor, dtype
 if key not in Gather._kernels_cache:
 Gather._kernels_cache[key] = Gather._gpu_kernel_factory(functor, from_dtype(dtype))

 return Gather._kernels_cache[key]

 def __call__(self, src: DeviceNDArray, dest: DeviceNDArray, stream: int = 0) -> None:
 """Performs a full gather.

 :param src: A host or device array.
 :param dst: Device array into which to write the permuted data to.
 :param stream: Optional CUDA stream in which to perform the transform.
 If no stream is specified, the default stream of 0 is used.
 """

 # ensure device array
 cuda.cudadrv.devicearray.require_cuda_ndarray(src)
 cuda.cudadrv.devicearray.require_cuda_ndarray(dest)

 # ensure 1d array
 if src.ndim != 1:
 raise TypeError("only support 1D array")

 # ensure size > 0
 if src.size < 1:
 raise ValueError("array's length is 0")

 # ensure same shape/size
 if (src.shape != dest.shape) or (src.size != dest.size):
 raise ValueError("arrays' share/size are not equal")

 kernel = self._compile(self.__functor, src.dtype)

 # Perform the gather on the GPU
 nb_threads = 512
 nb_blocks = (src.size + nb_threads - 1) // nb_threads

 kernel[nb_blocks, nb_threads, stream](src, dest)


if __name__ == "__main__":

 def check(array, expected, msg):
 error = 0
 middle = array.size // 2
 for i in range(array.size):
 e = expected[0 if i < middle else 1]
 if array[i] != e:
 error = error + 1
 if error > 0:
 print(f"{msg} does not work: {error} errors generated")
 else:
 print(f"{msg} seems to work")

 def test_scatter(size):
 h_arr = np.array([0,1], dtype=np.int32)
 h_arr = np.tile(h_arr, size//2)
 d_arr = cuda.to_device(h_arr)
 d_dest = cuda.device_array_like(d_arr)

 middle = size // 2

 def map(tid):
 if (tid % 2) == 0:
 return tid // 2
 return middle + (tid//2)

 scatterer = Scatter(map)
 scatterer(d_arr, d_dest)

 d_dest.copy_to_host(h_arr)

 check(h_arr, [0,1], "scatter")

 def test_gather(size):
 h_arr = np.array([0,1], dtype=np.int32)
 h_arr = np.tile(h_arr, size//2)
 d_arr = cuda.to_device(h_arr)
 d_dest = cuda.device_array_like(d_arr)

 middle = size // 2

 def map(tid):
 if tid < middle:
 return 2 * tid
 tid = tid - middle
 return 1 + 2 * tid

 gatherer = Gather(map)
 gatherer(d_arr, d_dest)

 d_dest.copy_to_host(h_arr)

 check(h_arr, [0,1], "gather")

 test_scatter(1<<20)
 test_gather(1<<20)

## Exercise 5
Now that you have everything, writing the partition pattern you must do.


In [None]:
class Partition:
 """
 Partition parallel pattern.

 Do a partition of a vector of data, using a functor that implements the predicate.
 All values for which the predicate is true are placed first, then the others.
 This pattern respect the relative order of the two subsets of data.
 """
 _scatter_kernels_cache = {}

 def __init__(self: Partition, predicate: Callable) -> None:
 d_predicate = cuda.jit(func_or_sig=predicate, device=True, cache=True)
 self.__predicate = d_predicate

 @staticmethod
 def _gpu_kernel_factory(predicate, np_type: Any) -> cuda.FakeCUDAKernel:
 """Factory of kernels for the scatter permutation...

 This function returns a Cuda Kernel that does the permutation of some data."""

 def kernel(d_input, d_output, d_up, d_down) -> None:
 tid = cuda.grid(1)
 # TODO: the scatter using predicate...
 
 return cuda.jit(func_or_sig=kernel, cache=True)

 @staticmethod
 def _compile(functor: Callable, dtype: Any) -> cuda.FakeCUDAKernel:

 key = functor, dtype
 if key not in Partition._scatter_kernels_cache:
 Partition._scatter_kernels_cache[key] = Partition._gpu_kernel_factory(functor, from_dtype(dtype))

 return Partition._scatter_kernels_cache[key]

 def __call__(self, src: DeviceNDArray, dest: DeviceNDArray,
 stream: Stream = cuda.default_stream()) -> float:
 cuda.cudadrv.devicearray.require_cuda_ndarray(src)
 cuda.cudadrv.devicearray.require_cuda_ndarray(dest)

 # ensure 1d array
 if src.ndim != 1:
 raise TypeError("only support 1D array")

 # ensure size > 0
 if src.size < 1:
 raise ValueError("array's length is 0")

 # ensure same shape/size
 if (src.shape != dest.shape) or (src.size != dest.size):
 raise ValueError("source and destination arrays' length are different")

 up = cuda.device_array(shape=src.shape, dtype=np.int32, stream=stream)
 down = cuda.device_array(shape=src.shape, dtype=np.int32, stream=stream)

 start_event = cuda.event(True)
 start_event.record(stream=stream)
 
 # TODO: fill the logic
 
 stop_event = cuda.event(True)
 stop_event.record(stream=stream)
 stop_event.synchronize()

 return cuda.event_elapsed_time(start_event, stop_event)


if __name__ == "__main__":
 def check(result, reversed):
 error = 0
 if reversed:
 nb_ones = 0
 for i in range(1, len(result)):
 if result[i] == 1:
 nb_ones += 1
 elif nb_ones > 0:
 error += 1
 else:
 nb_zeros = 0
 for i in range(1, len(result)):
 if result[i] == 0:
 nb_zeros += 1
 elif nb_zeros>0:
 error += 1
 if error == 0:
 print("well done, seems to work")
 else:
 print(f"ouch, you have {error} errors")
 display(result)


 def test_int32(size, reversed=False):
 rng = np.random.default_rng()
 h_array = rng.integers(low=0, high=2, size=size, dtype=np.int32)
 d_array = cuda.to_device(h_array)
 d_result = cuda.device_array(shape=d_array.shape, dtype=d_array.dtype)
 # the predicate move 0 first (so it inverses the value)
 partition = Partition(lambda x: 1 - x) if reversed else Partition(lambda x: x)
 partition(d_array, d_result, np.int32(0))
 ct = partition(d_array, d_result, np.int32(0))
 check(d_result.copy_to_host(), reversed)
 print(f"Computation time is {ct} ms")


 def test_int64(size, reversed=False):
 rng = np.random.default_rng()
 h_array = rng.integers(low=0, high=2, size=size, dtype=np.int64)
 d_array = cuda.to_device(h_array)
 d_result = cuda.device_array(shape=d_array.shape, dtype=d_array.dtype)
 # the predicate move 0 first (so it inverses the value)
 partition = Partition(lambda x: 1 - x) if reversed else Partition(lambda x: x)
 partition(d_array, d_result, np.int64(0))
 ct = partition(d_array, d_result, np.int64(0))
 check(d_result.copy_to_host(), reversed)
 print(f"Computation time is {ct} ms")


 test_int32(1 << 25)
 test_int64(1 << 25)
 test_int32(1 << 25, True)
 test_int64(1 << 25, True)


## Exercise 6
To conclude this labwork, you must write the radix sort algorithm.
This algorithm should work for integers only, with a given size (e.g. 4 bytes)...

To optimize it, you may use temporary destination array and do partition for src to this temporary array, then from this temporary to dst, and so on using dst and temporary array... 

In [None]:
class RadixSort:

 def __call__(self, src, dest):
 cuda.cudadrv.devicearray.require_cuda_ndarray(src)
 cuda.cudadrv.devicearray.require_cuda_ndarray(dest)

 # ensure 1d array
 if src.ndim != 1:
 raise TypeError("only support 1D array")

 # ensure size > 0
 if src.size < 1:
 raise ValueError("array's length is 0")

 # ensure same shape/size
 if (src.shape != dest.shape) or (src.size != dest.size):
 raise ValueError("source and destination arrays' length are different")

 nb_bytes = src.nbytes // src.size

 temp = [
 dest,
 cuda.device_array(shape=src.shape, dtype=src.dtype)
 ]

 temp[0].copy_to_device(src)

 ct = 0.0

 for bit in range(0, 8 * nb_bytes, 2):
 # TODO
 pass

 return ct


if __name__ == "__main__":
 def check(result, expected):
 answer = np.array_equal(expected, result)
 if answer:
 print("well done, seems to work")
 else:
 print(f"ouch, it doesn't work")
 display(result)


 def test_int32(size, reversed=False):
 rng = np.random.default_rng()
 h_array = rng.integers(low=0, high=(1<<31)-1, size=size, dtype=np.int32)
 d_array = cuda.to_device(h_array)
 d_result = cuda.device_array(shape=d_array.shape, dtype=d_array.dtype)
 # the predicate move 0 first (so it inverses the value)
 _sav, core.config.CUDA_LOW_OCCUPANCY_WARNINGS = core.config.CUDA_LOW_OCCUPANCY_WARNINGS, False
 sort = RadixSort()
 sort(d_array, d_result)
 ct = sort(d_array, d_result)
 print(f"Computation time is {ct} ms")
 check(d_result.copy_to_host(), np.sort(h_array))
 core.config.CUDA_LOW_OCCUPANCY_WARNINGS = _sav


 test_int32(1 << 25)
