{ "cells": [ { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "# Labwork 4\n", "This labwork concerns the lecture 4.\n", "The final objective is to write the **radix sort**...\n", "\n", "Let us recall that this sort relies onto the partition pattern, that itself relies onto exclusive scan and inclusive reverse scan..." ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Exercise 1\n", "Here we start with the simple question: write the exclusive scan (so labwork 3 with some basic modification)..." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from __future__ import annotations\n", "\n", "from typing import Any, Callable\n", "\n", "import numpy as np\n", "from numba import cuda, core\n", "from numba.cuda.cudadrv.devicearray import DeviceNDArray\n", "from numba.cuda.cudadrv.driver import Stream\n", "from numba.np.numpy_support import from_dtype\n", "\n", "\n", "def display(array, all_values=False):\n", " def aprint(data_range, noend=False):\n", " for i in data_range:\n", " print(f\"{array[i]} \", end=\"\")\n", " if (i % 16) == 15: print(\"\")\n", " if not noend: print(\"...\")\n", "\n", " if all_values:\n", " aprint(range(array.size), noend=True)\n", " return\n", " aprint(range(0, min(array.size, 16)))\n", " if array.size < 256 - 16: return\n", " aprint(range(256 - 16, min(array.size, 256 + 16)))\n", " if array.size < 512 - 16: return\n", " aprint(range(512 - 16, min(array.size, 512 + 16)))\n", " if array.size < 768 - 16: return\n", " aprint(range(768 - 16, min(array.size, 768 + 16)))\n", " aprint(range(array.size - 16 - 256, array.size - 256 + 16))\n", " aprint(range(array.size - 16, array.size), noend=True)\n", "\n", "\n", "class ExclusiveScan(object):\n", " \"\"\"Create a scan object that scans values using a given binary\n", " function. The binary function is compiled once and cached inside this\n", " object. Keeping this object alive will prevent re-compilation.\n", " \"\"\"\n", "\n", " _kernels_block_scan = {}\n", " _kernels_block_map = {}\n", "\n", " _WARP_SIZE = 32\n", " _NUM_WARPS = 8\n", "\n", " @staticmethod\n", " def _gpu_kernel_block_scan_factory(fn, np_type):\n", " \"\"\"Factory of kernels for the block-scan problem...\n", "\n", " This function returns a Cuda Kernel that does block-scan of some data using a given binary functor.\"\"\"\n", "\n", " scan_op = cuda.jit(func_or_sig=fn, device=True, cache=True)\n", "\n", " max_block_size = ExclusiveScan._NUM_WARPS * ExclusiveScan._WARP_SIZE\n", "\n", " @cuda.jit(device=True, cache=True)\n", " def load_shared_memory(shared_memory, d_input):\n", " local_tid = cuda.threadIdx.x\n", " tid = cuda.grid(1)\n", " # TODO: load data into shared memory\n", "\n", " @cuda.jit(device=True, cache=True)\n", " def pointer_jumping(shared_memory, jump):\n", " tid = cuda.threadIdx.x\n", " # TODO: implement the pointer jumping\n", "\n", " @cuda.jit(device=True, cache=True)\n", " def save_shared_memory(shared_memory, d_output, d_extras, null_value):\n", " local_tid = cuda.threadIdx.x\n", " global_tid = cuda.grid(1)\n", " # TODO: save data from shared memory\n", "\n", " def gpu_scan_block(d_input, d_output, d_extras, null_value):\n", " \"\"\"\n", " Per block SCAN...\n", " \"\"\"\n", " # move data to shared memory\n", " shared_memory = cuda.shared.array(shape=max_block_size, dtype=np_type)\n", " load_shared_memory(shared_memory, d_input)\n", "\n", " # implements the logics\n", " jump = 1\n", " while jump < cuda.blockDim.x:\n", " pointer_jumping(shared_memory, jump)\n", " jump *= 2\n", "\n", " # now stores the result\n", " save_shared_memory(shared_memory, d_output, d_extras, null_value)\n", "\n", " return cuda.jit(func_or_sig=gpu_scan_block, cache=True)\n", "\n", " def __init__(self, functor):\n", " \"\"\"\n", " :param functor: A function implementing a binary operation for\n", " scan. It will be compiled as a CUDA device\n", " function using ``cuda.jit(device=True)``.\n", " \"\"\"\n", " self._functor = functor\n", "\n", " def _compile_block_scan(self, dtype):\n", " key = self._functor, dtype\n", " if key not in self._kernels_block_scan:\n", " self._kernels_block_scan[key] = ExclusiveScan._gpu_kernel_block_scan_factory(self._functor,\n", " from_dtype(dtype))\n", " return self._kernels_block_scan[key]\n", "\n", " @staticmethod\n", " def _gpu_kernel_block_map_factory(fn, np_type):\n", " \"\"\"Factory of kernels for the block-map problem...\n", "\n", " This function returns a Cuda Kernel that does block-map of some data using a given binary functor.\"\"\"\n", "\n", " scan_op = cuda.jit(func_or_sig=fn, device=True, cache=True)\n", "\n", " def gpu_map_block(d_io, d_scan):\n", " \"\"\"\n", " Per block MAP...\n", " \"\"\"\n", " # TODO: implements the logics\n", " tid = cuda.grid(1)\n", " bid = cuda.blockIdx.x\n", " if bid == 0:\n", " # skip the first block (no map needed)\n", " return\n", " extra = d_scan[bid]\n", " d_io[tid] = scan_op(extra, d_io[tid])\n", "\n", " return cuda.jit(func_or_sig=gpu_map_block, cache=True)\n", "\n", " def _compile_block_map(self, dtype):\n", " key = self._functor, dtype\n", " if key not in self._kernels_block_map:\n", " self._kernels_block_map[key] = ExclusiveScan._gpu_kernel_block_map_factory(self._functor, from_dtype(dtype))\n", " return self._kernels_block_map[key]\n", "\n", " def __call__(self, d_input, d_output, null_value, stream=cuda.default_stream()):\n", " \"\"\" Performs a per-block SCAN.\n", "\n", " :param d_input: A device array with input data.\n", " :param d_output: A device array to store the result.\n", " :param null_value: The null value for the\n", " :param stream: Optional CUDA stream in which to perform the scan.\n", " If no stream is specified, the default stream of 0 is used.\n", " \"\"\"\n", "\n", " # ensure 1d array\n", " if d_input.ndim != 1:\n", " raise TypeError(\"only support 1D array\")\n", "\n", " # ensure size > 0\n", " if d_input.size < 1:\n", " raise ValueError(\"array's length is 0\")\n", "\n", " # ensure arrays' size are the same\n", " if d_input.size != d_output.size:\n", " raise ValueError(\"arrays' length are different ({d_input.size} / {d_output.size}\")\n", "\n", " _sav, core.config.CUDA_LOW_OCCUPANCY_WARNINGS = core.config.CUDA_LOW_OCCUPANCY_WARNINGS, False\n", "\n", " kernel_step1 = self._compile_block_scan(d_input.dtype)\n", " kernel_step2 = self._compile_block_map(d_input.dtype)\n", "\n", " max_block_size = ExclusiveScan._WARP_SIZE * ExclusiveScan._NUM_WARPS\n", " nb_threads = min(max_block_size, d_input.size)\n", " nb_blocks = (d_input.size + nb_threads - 1) // nb_threads\n", " extras = cuda.device_array(shape=nb_blocks, dtype=d_input.dtype)\n", "\n", " # Perform the reduction on the GPU\n", " start_event = cuda.event(True)\n", " start_event.record(stream=stream)\n", "\n", " kernel_step1[nb_blocks, nb_threads, stream](d_input, d_output, extras, null_value)\n", " # print(f\"launch kernel_step1[{nb_blocks}, {nb_threads}]\")\n", " if nb_blocks > 1:\n", " # display(extras, all=True)\n", " self(extras, extras, null_value, stream)\n", " kernel_step2[nb_blocks, nb_threads, stream](d_output, extras)\n", "\n", " stop_event = cuda.event(True)\n", " stop_event.record(stream=stream)\n", " stop_event.synchronize()\n", "\n", " core.config.CUDA_LOW_OCCUPANCY_WARNINGS = _sav\n", "\n", " # display(extras)\n", "\n", " return cuda.event_elapsed_time(start_event, stop_event)\n", "\n", "\n", "if __name__ == \"__main__\":\n", "\n", " def check(expected, result):\n", " it_works = np.array_equal(expected, result)\n", " if it_works:\n", " print(f'- seems to work')\n", " else:\n", " print(f'- seems to not work, here is your result:')\n", " display(result)\n", " display(expected)\n", "\n", "\n", " def test_int32(size):\n", " scanner = ExclusiveScan(lambda a, b: a + b)\n", " h_array = np.ones(size, dtype=np.int32)\n", " d_result = cuda.device_array(size, dtype=np.int32)\n", " scanner(cuda.to_device(h_array), d_result, np.int32(0)) # turn on gpu\n", " ct = scanner(cuda.to_device(h_array), d_result, np.int32(0))\n", " print(f\"scan computation time is {ct} ms\")\n", " expected = np.concatenate((np.zeros(1, dtype=np.int32), np.cumsum(h_array[1:])))\n", " check(expected, d_result.copy_to_host())\n", "\n", "\n", " def test_float32(size):\n", " scanner = ExclusiveScan(lambda a, b: a + b)\n", " h_array = np.ones(size, dtype=np.float32)\n", " d_result = cuda.device_array(size, dtype=np.float32)\n", " scanner(cuda.to_device(h_array), d_result, np.float32(0.0)) # turn on gpu\n", " ct = scanner(cuda.to_device(h_array), d_result, np.float32(0.0))\n", " print(f\"scan computation time is {ct} ms\")\n", " expected = np.concatenate((np.zeros(1, dtype=np.float32), np.cumsum(h_array[1:])))\n", " check(expected, d_result.copy_to_host())\n", "\n", "\n", " def test_float64(size):\n", " scanner = ExclusiveScan(lambda a, b: a + b)\n", " h_array = np.ones(size, dtype=np.float64)\n", " d_result = cuda.device_array(size, dtype=np.float64)\n", " scanner(cuda.to_device(h_array), d_result, np.float64(0.0)) # turn on gpu\n", " ct = scanner(cuda.to_device(h_array), d_result, np.float64(0.0))\n", " print(f\"scan computation time is {ct} ms\")\n", " expected = np.concatenate((np.zeros(1, dtype=np.float64), np.cumsum(h_array[1:])))\n", " check(expected, d_result.copy_to_host())\n", "\n", "\n", " test_int32(1 << 24)\n", " # with float32, the sequential cumsum fails with more than 2^24 number...\n", " test_float32(1 << 24)\n", " test_float64(1 << 24)\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Exercise 2\n", "Here we continue with the Inclusive scan in the reversed version (so, labwork 3 with some minor modifications again...)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from __future__ import annotations\n", "\n", "import numpy as np\n", "from numba.np.numpy_support import from_dtype\n", "from numba import cuda, core\n", "\n", "\n", "def display(array, all_values=False):\n", " def aprint(data_range, noend=False):\n", " for i in data_range:\n", " print(f\"{array[i]} \", end=\"\")\n", " if (i % 16) == 15: print(\"\")\n", " if not noend: print(\"...\")\n", "\n", " if all_values:\n", " aprint(range(array.size), noend=True)\n", " return\n", " aprint(range(0, min(array.size, 16)))\n", " if array.size < 256 - 16: return\n", " aprint(range(256 - 16, min(array.size, 256 + 16)))\n", " if array.size < 512 - 16: return\n", " aprint(range(512 - 16, min(array.size, 512 + 16)))\n", " if array.size < 768 - 16: return\n", " aprint(range(768 - 16, min(array.size, 768 + 16)))\n", " aprint(range(array.size - 16 - 256, array.size - 256 + 16))\n", " aprint(range(array.size - 16, array.size), noend=True)\n", "\n", "\n", "class ReverseIScan(object):\n", " \"\"\"Create a scan object that scans values using a given binary\n", " function. The binary function is compiled once and cached inside this\n", " object. Keeping this object alive will prevent re-compilation.\n", " \"\"\"\n", "\n", " _kernels_block_scan = {}\n", " _kernels_block_map = {}\n", "\n", " _WARP_SIZE = 32\n", " _NUM_WARPS = 8\n", "\n", " @staticmethod\n", " def _gpu_kernel_block_scan_factory(fn, np_type):\n", " \"\"\"Factory of kernels for the block-scan problem...\n", "\n", " This function returns a Cuda Kernel that does block-scan of some data using a given binary functor.\"\"\"\n", "\n", " scan_op = cuda.jit(func_or_sig=fn, device=True, cache=True)\n", "\n", " max_block_size = ReverseIScan._NUM_WARPS * ReverseIScan._WARP_SIZE\n", "\n", " @cuda.jit(device=True, cache=True)\n", " def load_shared_memory(shared_memory, d_input):\n", " local_tid = cuda.threadIdx.x\n", " tid = cuda.grid(1)\n", " # TODO: load data into shared memory\n", "\n", " @cuda.jit(device=True, cache=True)\n", " def pointer_jumping(shared_memory, jump):\n", " tid = cuda.threadIdx.x\n", " # TODO: implement the pointer jumping\n", "\n", " @cuda.jit(device=True, cache=True)\n", " def save_shared_memory(shared_memory, d_output, d_extras):\n", " local_tid = cuda.threadIdx.x\n", " global_tid = cuda.grid(1)\n", " # TODO: save data from shared memory\n", "\n", " def gpu_scan_block(d_input, d_output, d_extras):\n", " \"\"\"\n", " Per block SCAN...\n", " \"\"\"\n", " # move data to shared memory\n", " shared_memory = cuda.shared.array(shape=max_block_size, dtype=np_type)\n", " load_shared_memory(shared_memory, d_input)\n", "\n", " # implements the logics\n", " jump = 1\n", " while jump < cuda.blockDim.x:\n", " pointer_jumping(shared_memory, jump)\n", " jump *= 2\n", "\n", " # now stores the result\n", " save_shared_memory(shared_memory, d_output, d_extras)\n", "\n", " return cuda.jit(func_or_sig=gpu_scan_block, cache=True)\n", "\n", " def __init__(self, functor):\n", " \"\"\"\n", " :param functor: A function implementing a binary operation for\n", " scan. It will be compiled as a CUDA device\n", " function using ``cuda.jit(device=True)``.\n", " \"\"\"\n", " self._functor = functor\n", "\n", " def _compile_block_scan(self, dtype):\n", " key = self._functor, dtype\n", " if key not in self._kernels_block_scan:\n", " self._kernels_block_scan[key] = ReverseIScan._gpu_kernel_block_scan_factory(self._functor,\n", " from_dtype(dtype))\n", " return self._kernels_block_scan[key]\n", "\n", " @staticmethod\n", " def _gpu_kernel_block_map_factory(fn, np_type):\n", " \"\"\"Factory of kernels for the block-map problem...\n", "\n", " This function returns a Cuda Kernel that does block-map of some data using a given binary functor.\"\"\"\n", "\n", " scan_op = cuda.jit(func_or_sig=fn, device=True, cache=True)\n", "\n", " def gpu_map_block(d_io, d_scan):\n", " \"\"\"\n", " Per block MAP...\n", " \"\"\"\n", " # TODO: implements the logics\n", " tid = cuda.grid(1)\n", " bid = cuda.blockIdx.x\n", " if bid + 1 == cuda.gridDim.x:\n", " # skip the last block (no map needed)\n", " return\n", " extra = d_scan[bid + 1]\n", " d_io[tid] = scan_op(extra, d_io[tid])\n", "\n", " return cuda.jit(func_or_sig=gpu_map_block, cache=True)\n", "\n", " def _compile_block_map(self, dtype):\n", " key = self._functor, dtype\n", " if key not in self._kernels_block_map:\n", " self._kernels_block_map[key] = ReverseIScan._gpu_kernel_block_map_factory(self._functor, from_dtype(dtype))\n", " return self._kernels_block_map[key]\n", "\n", " def __call__(self, d_input, d_output, stream=cuda.default_stream()):\n", " \"\"\" Performs a per-block SCAN.\n", "\n", " :param d_input: A device array with input data.\n", " :param d_output: A device array to store the result.\n", " :param stream: Optional CUDA stream in which to perform the scan.\n", " If no stream is specified, the default stream of 0 is used.\n", " \"\"\"\n", "\n", " # ensure 1d array\n", " if d_input.ndim != 1:\n", " raise TypeError(\"only support 1D array\")\n", "\n", " # ensure size > 0\n", " if d_input.size < 1:\n", " raise ValueError(\"array's length is 0\")\n", "\n", " # ensure arrays' size are the same\n", " if d_input.size != d_output.size:\n", " raise ValueError(\"arrays' length are different ({d_input.size} / {d_output.size}\")\n", "\n", " _sav, core.config.CUDA_LOW_OCCUPANCY_WARNINGS = core.config.CUDA_LOW_OCCUPANCY_WARNINGS, False\n", "\n", " kernel_step1 = self._compile_block_scan(d_input.dtype)\n", " kernel_step2 = self._compile_block_map(d_input.dtype)\n", "\n", " max_block_size = ReverseIScan._WARP_SIZE * ReverseIScan._NUM_WARPS\n", " nb_threads = min(max_block_size, d_input.size)\n", " nb_blocks = (d_input.size + nb_threads - 1) // nb_threads\n", " extras = cuda.device_array(shape=nb_blocks, dtype=d_input.dtype)\n", "\n", " # Perform the reduction on the GPU\n", " start_event = cuda.event(True)\n", " start_event.record(stream=stream)\n", "\n", " kernel_step1[nb_blocks, nb_threads, stream](d_input, d_output, extras)\n", " # print(f\"launch kernel_step1[{nb_blocks}, {nb_threads}]\")\n", " if nb_blocks > 1:\n", " # display(extras, all=True)\n", " self(extras, extras, stream)\n", " kernel_step2[nb_blocks, nb_threads, stream](d_output, extras)\n", "\n", " stop_event = cuda.event(True)\n", " stop_event.record(stream=stream)\n", " stop_event.synchronize()\n", "\n", " core.config.CUDA_LOW_OCCUPANCY_WARNINGS = _sav\n", "\n", " # display(extras)\n", "\n", " return cuda.event_elapsed_time(start_event, stop_event)\n", "\n", "\n", "if __name__ == \"__main__\":\n", "\n", " def check(expected, result):\n", " it_works = np.array_equal(expected, result)\n", " if it_works:\n", " print(f'- seems to work')\n", " else:\n", " print(f'- seems to not work, here is your result:')\n", " display(result)\n", " display(expected)\n", "\n", "\n", " def test_int32(size):\n", " scanner = ReverseIScan(lambda a, b: a + b)\n", " h_array = np.ones(size, dtype=np.int32)\n", " d_result = cuda.device_array(size, dtype=np.int32)\n", " scanner(cuda.to_device(h_array), d_result, np.int32(0)) # turn on gpu\n", " ct = scanner(cuda.to_device(h_array), d_result, np.int32(0))\n", " print(f\"scan computation time is {ct} ms\")\n", " expected = np.flip(np.cumsum(h_array))\n", " check(expected, d_result.copy_to_host())\n", "\n", "\n", " def test_float32(size):\n", " scanner = ReverseIScan(lambda a, b: a + b)\n", " h_array = np.ones(size, dtype=np.float32)\n", " d_result = cuda.device_array(size, dtype=np.float32)\n", " scanner(cuda.to_device(h_array), d_result, np.float32(0.0)) # turn on gpu\n", " ct = scanner(cuda.to_device(h_array), d_result, np.float32(0.0))\n", " print(f\"scan computation time is {ct} ms\")\n", " expected = np.flip(np.cumsum(h_array))\n", " check(expected, d_result.copy_to_host())\n", "\n", "\n", " def test_float64(size):\n", " scanner = ReverseIScan(lambda a, b: a + b)\n", " h_array = np.ones(size, dtype=np.float64)\n", " d_result = cuda.device_array(size, dtype=np.float64)\n", " scanner(cuda.to_device(h_array), d_result, np.float64(0.0)) # turn on gpu\n", " ct = scanner(cuda.to_device(h_array), d_result, np.float64(0.0))\n", " print(f\"scan computation time is {ct} ms\")\n", " expected = np.flip(np.cumsum(h_array))\n", " check(expected, d_result.copy_to_host())\n", "\n", "\n", " test_int32(1 << 24)\n", " # with float32, the sequential cumsum fails with more than 2^24 number...\n", " test_float32(1 << 24)\n", " test_float64(1 << 24)\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Exercise 3\n", "The Partition pattern needs also a Unary Map pattern to generate the array of predicates... \n", "Fill the following classes to implement the kernel logics of unary and binary map. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class UnaryMap:\n", " _kernels_cache = {}\n", "\n", " def __init__(self: UnaryMap, functor) -> None:\n", " self.__functor = functor\n", "\n", " @staticmethod\n", " def _gpu_kernel_factory(fn, np_type):\n", " \"\"\"Factory of kernels for the unary transform problem...\n", "\n", " This function returns a Cuda Kernel that does the unary map of some data using a given unary functor.\"\"\"\n", "\n", " umap_op = cuda.jit(func_or_sig=fn, device=True, cache=True)\n", "\n", " def kernel(d_input, d_output) -> None:\n", " tid = cuda.grid(1)\n", " # TODO\n", "\n", " return cuda.jit(func_or_sig=kernel, cache=True)\n", "\n", " @staticmethod\n", " def _compile(functor, dtype):\n", "\n", " key = functor, dtype\n", " if key not in UnaryMap._kernels_cache:\n", " UnaryMap._kernels_cache[key] = UnaryMap._gpu_kernel_factory(functor, from_dtype(dtype))\n", "\n", " return UnaryMap._kernels_cache[key]\n", "\n", " def __call__(self, arr, res=None, stream=0):\n", " \"\"\"Performs a full unary transform.\n", "\n", " :param arr: A host or device array.\n", " :param res: Optional device array into which to write the transform\n", " result to. The result is written into the first element of\n", " this array. If this parameter is specified, then no\n", " communication of the map output takes place from the\n", " device to the host.\n", " :param stream: Optional CUDA stream in which to perform the transform.\n", " If no stream is specified, the default stream of 0 is used.\n", " :return: If ``res`` is specified, ``None`` is returned. Otherwise, the\n", " result of the reduction is returned.\n", " \"\"\"\n", "\n", " # ensure 1d array\n", " if arr.ndim != 1:\n", " raise TypeError(\"only support 1D array\")\n", "\n", " # ensure size > 0\n", " if arr.size < 1:\n", " raise ValueError(\"array's length is 0\")\n", "\n", " kernel = self._compile(self.__functor, arr.dtype)\n", "\n", " # Perform the MAP on the GPU\n", " nb_threads = 512\n", " nb_blocks = (arr.size + nb_threads - 1) // nb_threads\n", "\n", " dest = res\n", " if dest is None:\n", " dest = cuda.device_array_like(arr)\n", "\n", " elif dest.size != arr.size:\n", " raise ValueError(\"destination array's length does not equal input one\")\n", "\n", " kernel[nb_blocks, nb_threads, stream](arr, dest)\n", "\n", " if res is not None:\n", " return None\n", "\n", " return dest\n", "\n", "\n", "class BinaryMap:\n", " _kernels_cache = {}\n", "\n", " def __init__(self: BinaryMap, functor) -> None:\n", " self.__functor = functor\n", "\n", " @staticmethod\n", " def _gpu_kernel_factory(fn, np_type):\n", " \"\"\"Factory of kernels for the binary transform problem...\n", "\n", " This function returns a Cuda Kernel that does the binary map of some data using a given binary functor.\"\"\"\n", "\n", " map_op = cuda.jit(func_or_sig=fn, device=True, cache=True)\n", "\n", " def kernel(d_input_left, d_input_right, d_output) -> None:\n", " tid = cuda.grid(1)\n", " # TODO\n", "\n", " return cuda.jit(func_or_sig=kernel, cache=True)\n", "\n", " @staticmethod\n", " def _compile(functor, dtype):\n", "\n", " key = functor, dtype\n", " if key not in BinaryMap._kernels_cache:\n", " BinaryMap._kernels_cache[key] = BinaryMap._gpu_kernel_factory(functor, from_dtype(dtype))\n", "\n", " return BinaryMap._kernels_cache[key]\n", "\n", " def __call__(self, left, right, res=None, stream=0):\n", " \"\"\"Performs a full binary transform.\n", "\n", " :param left: A host or device array, used as the left operand.\n", " :param right: A host or device array, used as the right operand.\n", " :param res: Optional device array into which to write the transform\n", " result to. The result is written into the first element of\n", " this array. If this parameter is specified, then no\n", " communication of the map output takes place from the\n", " device to the host.\n", " :param stream: Optional CUDA stream in which to perform the transform.\n", " If no stream is specified, the default stream of 0 is used.\n", " :return: If ``res`` is specified, ``None`` is returned. Otherwise, the\n", " result of the reduction is returned.\n", " \"\"\"\n", "\n", " # ensure 1d array\n", " if left.ndim != 1:\n", " raise TypeError(\"only support 1D array\")\n", "\n", " # ensure size > 0\n", " if left.size < 1:\n", " raise ValueError(\"array's length is 0\")\n", "\n", " # ensure same size\n", " if left.size != right.size:\n", " raise ValueError(\"input arrays' length are not equal\")\n", "\n", " dest = res\n", " if dest is None:\n", " dest = cuda.device_array_like(left)\n", "\n", " elif dest.size != left.size:\n", " raise ValueError(\"destination array's length does not equal input ones\")\n", "\n", " kernel = self._compile(self.__functor, left.dtype)\n", "\n", " # Perform the MAP on the GPU\n", " nb_threads = 512\n", " nb_blocks = (left.size + nb_threads - 1) // nb_threads\n", "\n", " kernel[nb_blocks, nb_threads, stream](left, right, dest)\n", "\n", " if res is not None:\n", " return None\n", "\n", " return dest\n", "\n", "if __name__ == \"__main__\":\n", " def check(expected, result):\n", " if np.array_equal(expected, result):\n", " print(\"it seems to work!\")\n", " else:\n", " print(\"it doesn't work!\")\n", "\n", " def test_x2(size):\n", " rng = np.random.default_rng()\n", " h_array = rng.integers(low=0, high=1<<20, size=size, dtype=np.int32)\n", " d_array = cuda.to_device(h_array)\n", " d_result = cuda.device_array_like(d_array)\n", " umap = UnaryMap(lambda x: 2 * x)\n", " umap(d_array, d_result)\n", " check(h_array * 2, d_result.copy_to_host())\n", "\n", " def test_diff(size):\n", " rng = np.random.default_rng()\n", " h_left = rng.integers(low=0, high=1 << 20, size=size, dtype=np.int32)\n", " h_right = rng.integers(low=0, high=1 << 20, size=size, dtype=np.int32)\n", " d_left = cuda.to_device(h_left)\n", " d_right = cuda.to_device(h_right)\n", " d_result = cuda.device_array_like(d_left)\n", " bmap = BinaryMap(lambda l, r: l - r)\n", " bmap(d_left, d_right, d_result)\n", " check(h_left - h_right, d_result.copy_to_host())\n", "\n", " test_x2(1<<24)\n", " test_diff(1<<24)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Exercise 4\n", "\n", "The partition ends with a permutation (the scatter). \n", "Fill the following class for this purpose... And since we are speaking about permutation, do also the gather for the same price :-)\n", "\n", "Notice that the mapping is given through a functor, that can internaly use an array for instance or some private data..." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class Scatter:\n", " _kernels_cache = {}\n", "\n", " def __init__(self: Scatter, functor: Callable) -> None:\n", " self.__functor = functor\n", "\n", " @staticmethod\n", " def _gpu_kernel_factory(fn: Callable, np_type: Any) -> cuda.FakeCUDAKernel:\n", " \"\"\"Factory of kernels for the scatter permutation...\n", "\n", " This function returns a Cuda Kernel that does the permutation of some data.\"\"\"\n", "\n", " map_op = cuda.jit(func_or_sig=fn, device=True, cache=True)\n", "\n", " def kernel(d_input, d_output) -> None:\n", " tid = cuda.grid(1)\n", " # TODO\n", "\n", " return cuda.jit(func_or_sig=kernel, cache=True)\n", "\n", " @staticmethod\n", " def _compile(functor: Callable, dtype: Any) -> cuda.FakeCUDAKernel:\n", "\n", " key = functor, dtype\n", " if key not in Scatter._kernels_cache:\n", " Scatter._kernels_cache[key] = Scatter._gpu_kernel_factory(functor, from_dtype(dtype))\n", "\n", " return Scatter._kernels_cache[key]\n", "\n", " def __call__(self, src: DeviceNDArray, dest: DeviceNDArray, stream: int = 0) -> None:\n", " \"\"\"Performs a full scatter.\n", "\n", " :param src: A host or device array.\n", " :param dst: Device array into which to write the permuted data to.\n", " :param stream: Optional CUDA stream in which to perform the transform.\n", " If no stream is specified, the default stream of 0 is used.\n", " \"\"\"\n", "\n", " # ensure device array\n", " cuda.cudadrv.devicearray.require_cuda_ndarray(src)\n", " cuda.cudadrv.devicearray.require_cuda_ndarray(dest)\n", "\n", " # ensure 1d array\n", " if src.ndim != 1:\n", " raise TypeError(\"only support 1D array\")\n", "\n", " # ensure size > 0\n", " if src.size < 1:\n", " raise ValueError(\"array's length is 0\")\n", "\n", " # ensure same shape/size\n", " if (src.shape != dest.shape) or (src.size != dest.size):\n", " raise ValueError(\"source and destination arrays' length are different\")\n", "\n", " kernel = self._compile(self.__functor, src.dtype)\n", "\n", " # Perform the SCATTER on the GPU\n", " nb_threads = 512\n", " nb_blocks = (src.size + nb_threads - 1) // nb_threads\n", " kernel[nb_blocks, nb_threads, stream](src, dest)\n", "\n", "\n", "class Gather:\n", " _kernels_cache = {}\n", "\n", " def __init__(self: Gather, functor: Callable) -> None:\n", " self.__functor = functor\n", "\n", " @staticmethod\n", " def _gpu_kernel_factory(fn: Callable, np_type: Any) -> cuda.FakeCUDAKernel:\n", " \"\"\"Factory of kernels for the gather permutation...\n", "\n", " This function returns a Cuda Kernel that does the permutation of some data.\"\"\"\n", "\n", " map_op = cuda.jit(device=True)(fn)\n", "\n", " def kernel(d_input, d_output) -> None:\n", " tid = cuda.grid(1)\n", " # TODO\n", "\n", " return cuda.jit(func_or_sig=kernel, cache=True)\n", "\n", " @staticmethod\n", " def _compile(functor: Callable, dtype: Any) -> cuda.FakeCUDAKernel:\n", "\n", " key = functor, dtype\n", " if key not in Gather._kernels_cache:\n", " Gather._kernels_cache[key] = Gather._gpu_kernel_factory(functor, from_dtype(dtype))\n", "\n", " return Gather._kernels_cache[key]\n", "\n", " def __call__(self, src: DeviceNDArray, dest: DeviceNDArray, stream: int = 0) -> None:\n", " \"\"\"Performs a full gather.\n", "\n", " :param src: A host or device array.\n", " :param dst: Device array into which to write the permuted data to.\n", " :param stream: Optional CUDA stream in which to perform the transform.\n", " If no stream is specified, the default stream of 0 is used.\n", " \"\"\"\n", "\n", " # ensure device array\n", " cuda.cudadrv.devicearray.require_cuda_ndarray(src)\n", " cuda.cudadrv.devicearray.require_cuda_ndarray(dest)\n", "\n", " # ensure 1d array\n", " if src.ndim != 1:\n", " raise TypeError(\"only support 1D array\")\n", "\n", " # ensure size > 0\n", " if src.size < 1:\n", " raise ValueError(\"array's length is 0\")\n", "\n", " # ensure same shape/size\n", " if (src.shape != dest.shape) or (src.size != dest.size):\n", " raise ValueError(\"arrays' share/size are not equal\")\n", "\n", " kernel = self._compile(self.__functor, src.dtype)\n", "\n", " # Perform the gather on the GPU\n", " nb_threads = 512\n", " nb_blocks = (src.size + nb_threads - 1) // nb_threads\n", "\n", " kernel[nb_blocks, nb_threads, stream](src, dest)\n", "\n", "\n", "if __name__ == \"__main__\":\n", "\n", " def check(array, expected, msg):\n", " error = 0\n", " middle = array.size // 2\n", " for i in range(array.size):\n", " e = expected[0 if i < middle else 1]\n", " if array[i] != e:\n", " error = error + 1\n", " if error > 0:\n", " print(f\"{msg} does not work: {error} errors generated\")\n", " else:\n", " print(f\"{msg} seems to work\")\n", "\n", " def test_scatter(size):\n", " h_arr = np.array([0,1], dtype=np.int32)\n", " h_arr = np.tile(h_arr, size//2)\n", " d_arr = cuda.to_device(h_arr)\n", " d_dest = cuda.device_array_like(d_arr)\n", "\n", " middle = size // 2\n", "\n", " def map(tid):\n", " if (tid % 2) == 0:\n", " return tid // 2\n", " return middle + (tid//2)\n", "\n", " scatterer = Scatter(map)\n", " scatterer(d_arr, d_dest)\n", "\n", " d_dest.copy_to_host(h_arr)\n", "\n", " check(h_arr, [0,1], \"scatter\")\n", "\n", " def test_gather(size):\n", " h_arr = np.array([0,1], dtype=np.int32)\n", " h_arr = np.tile(h_arr, size//2)\n", " d_arr = cuda.to_device(h_arr)\n", " d_dest = cuda.device_array_like(d_arr)\n", "\n", " middle = size // 2\n", "\n", " def map(tid):\n", " if tid < middle:\n", " return 2 * tid\n", " tid = tid - middle\n", " return 1 + 2 * tid\n", "\n", " gatherer = Gather(map)\n", " gatherer(d_arr, d_dest)\n", "\n", " d_dest.copy_to_host(h_arr)\n", "\n", " check(h_arr, [0,1], \"gather\")\n", "\n", " test_scatter(1<<20)\n", " test_gather(1<<20)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Exercise 5\n", "Now that you have everything, writing the partition pattern you must do.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class Partition:\n", " \"\"\"\n", " Partition parallel pattern.\n", "\n", " Do a partition of a vector of data, using a functor that implements the predicate.\n", " All values for which the predicate is true are placed first, then the others.\n", " This pattern respect the relative order of the two subsets of data.\n", " \"\"\"\n", " _scatter_kernels_cache = {}\n", "\n", " def __init__(self: Partition, predicate: Callable) -> None:\n", " d_predicate = cuda.jit(func_or_sig=predicate, device=True, cache=True)\n", " self.__predicate = d_predicate\n", "\n", " @staticmethod\n", " def _gpu_kernel_factory(predicate, np_type: Any) -> cuda.FakeCUDAKernel:\n", " \"\"\"Factory of kernels for the scatter permutation...\n", "\n", " This function returns a Cuda Kernel that does the permutation of some data.\"\"\"\n", "\n", " def kernel(d_input, d_output, d_up, d_down) -> None:\n", " tid = cuda.grid(1)\n", " # TODO: the scatter using predicate...\n", " \n", " return cuda.jit(func_or_sig=kernel, cache=True)\n", "\n", " @staticmethod\n", " def _compile(functor: Callable, dtype: Any) -> cuda.FakeCUDAKernel:\n", "\n", " key = functor, dtype\n", " if key not in Partition._scatter_kernels_cache:\n", " Partition._scatter_kernels_cache[key] = Partition._gpu_kernel_factory(functor, from_dtype(dtype))\n", "\n", " return Partition._scatter_kernels_cache[key]\n", "\n", " def __call__(self, src: DeviceNDArray, dest: DeviceNDArray,\n", " stream: Stream = cuda.default_stream()) -> float:\n", " cuda.cudadrv.devicearray.require_cuda_ndarray(src)\n", " cuda.cudadrv.devicearray.require_cuda_ndarray(dest)\n", "\n", " # ensure 1d array\n", " if src.ndim != 1:\n", " raise TypeError(\"only support 1D array\")\n", "\n", " # ensure size > 0\n", " if src.size < 1:\n", " raise ValueError(\"array's length is 0\")\n", "\n", " # ensure same shape/size\n", " if (src.shape != dest.shape) or (src.size != dest.size):\n", " raise ValueError(\"source and destination arrays' length are different\")\n", "\n", " up = cuda.device_array(shape=src.shape, dtype=np.int32, stream=stream)\n", " down = cuda.device_array(shape=src.shape, dtype=np.int32, stream=stream)\n", "\n", " start_event = cuda.event(True)\n", " start_event.record(stream=stream)\n", " \n", " # TODO: fill the logic\n", " \n", " stop_event = cuda.event(True)\n", " stop_event.record(stream=stream)\n", " stop_event.synchronize()\n", "\n", " return cuda.event_elapsed_time(start_event, stop_event)\n", "\n", "\n", "if __name__ == \"__main__\":\n", " def check(result, reversed):\n", " error = 0\n", " if reversed:\n", " nb_ones = 0\n", " for i in range(1, len(result)):\n", " if result[i] == 1:\n", " nb_ones += 1\n", " elif nb_ones > 0:\n", " error += 1\n", " else:\n", " nb_zeros = 0\n", " for i in range(1, len(result)):\n", " if result[i] == 0:\n", " nb_zeros += 1\n", " elif nb_zeros>0:\n", " error += 1\n", " if error == 0:\n", " print(\"well done, seems to work\")\n", " else:\n", " print(f\"ouch, you have {error} errors\")\n", " display(result)\n", "\n", "\n", " def test_int32(size, reversed=False):\n", " rng = np.random.default_rng()\n", " h_array = rng.integers(low=0, high=2, size=size, dtype=np.int32)\n", " d_array = cuda.to_device(h_array)\n", " d_result = cuda.device_array(shape=d_array.shape, dtype=d_array.dtype)\n", " # the predicate move 0 first (so it inverses the value)\n", " partition = Partition(lambda x: 1 - x) if reversed else Partition(lambda x: x)\n", " partition(d_array, d_result, np.int32(0))\n", " ct = partition(d_array, d_result, np.int32(0))\n", " check(d_result.copy_to_host(), reversed)\n", " print(f\"Computation time is {ct} ms\")\n", "\n", "\n", " def test_int64(size, reversed=False):\n", " rng = np.random.default_rng()\n", " h_array = rng.integers(low=0, high=2, size=size, dtype=np.int64)\n", " d_array = cuda.to_device(h_array)\n", " d_result = cuda.device_array(shape=d_array.shape, dtype=d_array.dtype)\n", " # the predicate move 0 first (so it inverses the value)\n", " partition = Partition(lambda x: 1 - x) if reversed else Partition(lambda x: x)\n", " partition(d_array, d_result, np.int64(0))\n", " ct = partition(d_array, d_result, np.int64(0))\n", " check(d_result.copy_to_host(), reversed)\n", " print(f\"Computation time is {ct} ms\")\n", "\n", "\n", " test_int32(1 << 25)\n", " test_int64(1 << 25)\n", " test_int32(1 << 25, True)\n", " test_int64(1 << 25, True)\n" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "## Exercise 6\n", "To conclude this labwork, you must write the radix sort algorithm.\n", "This algorithm should work for integers only, with a given size (e.g. 4 bytes)...\n", "\n", "To optimize it, you may use temporary destination array and do partition for src to this temporary array, then from this temporary to dst, and so on using dst and temporary array... " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class RadixSort:\n", "\n", " def __call__(self, src, dest):\n", " cuda.cudadrv.devicearray.require_cuda_ndarray(src)\n", " cuda.cudadrv.devicearray.require_cuda_ndarray(dest)\n", "\n", " # ensure 1d array\n", " if src.ndim != 1:\n", " raise TypeError(\"only support 1D array\")\n", "\n", " # ensure size > 0\n", " if src.size < 1:\n", " raise ValueError(\"array's length is 0\")\n", "\n", " # ensure same shape/size\n", " if (src.shape != dest.shape) or (src.size != dest.size):\n", " raise ValueError(\"source and destination arrays' length are different\")\n", "\n", " nb_bytes = src.nbytes // src.size\n", "\n", " temp = [\n", " dest,\n", " cuda.device_array(shape=src.shape, dtype=src.dtype)\n", " ]\n", "\n", " temp[0].copy_to_device(src)\n", "\n", " ct = 0.0\n", "\n", " for bit in range(0, 8 * nb_bytes, 2):\n", " # TODO\n", " pass\n", "\n", " return ct\n", "\n", "\n", "if __name__ == \"__main__\":\n", " def check(result, expected):\n", " answer = np.array_equal(expected, result)\n", " if answer:\n", " print(\"well done, seems to work\")\n", " else:\n", " print(f\"ouch, it doesn't work\")\n", " display(result)\n", "\n", "\n", " def test_int32(size, reversed=False):\n", " rng = np.random.default_rng()\n", " h_array = rng.integers(low=0, high=(1<<31)-1, size=size, dtype=np.int32)\n", " d_array = cuda.to_device(h_array)\n", " d_result = cuda.device_array(shape=d_array.shape, dtype=d_array.dtype)\n", " # the predicate move 0 first (so it inverses the value)\n", " _sav, core.config.CUDA_LOW_OCCUPANCY_WARNINGS = core.config.CUDA_LOW_OCCUPANCY_WARNINGS, False\n", " sort = RadixSort()\n", " sort(d_array, d_result)\n", " ct = sort(d_array, d_result)\n", " print(f\"Computation time is {ct} ms\")\n", " check(d_result.copy_to_host(), np.sort(h_array))\n", " core.config.CUDA_LOW_OCCUPANCY_WARNINGS = _sav\n", "\n", "\n", " test_int32(1 << 25)\n" ] } ], "metadata": { "language_info": { "name": "python" }, "orig_nbformat": 4 }, "nbformat": 4, "nbformat_minor": 2 }