{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Labwork 2" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run the following cell to ensure numba is installed (skip if you are sure)" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Defaulting to user installation because normal site-packages is not writeable\n", "Requirement already satisfied: numba in c:\\users\\laveneau\\appdata\\roaming\\python\\python39\\site-packages (0.55.1)\n", "Requirement already satisfied: llvmlite<0.39,>=0.38.0rc1 in c:\\users\\laveneau\\appdata\\roaming\\python\\python39\\site-packages (from numba) (0.38.0)\n", "Requirement already satisfied: numpy<1.22,>=1.18 in c:\\program files\\python\\lib\\site-packages (from numba) (1.21.2)\n", "Requirement already satisfied: setuptools in c:\\program files\\python\\lib\\site-packages (from numba) (57.4.0)\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ "WARNING: Ignoring invalid distribution -ypy (c:\\users\\laveneau\\appdata\\roaming\\python\\python39\\site-packages)\n", "WARNING: Ignoring invalid distribution -ypy (c:\\users\\laveneau\\appdata\\roaming\\python\\python39\\site-packages)\n", "WARNING: Ignoring invalid distribution -ypy (c:\\users\\laveneau\\appdata\\roaming\\python\\python39\\site-packages)\n", "WARNING: Ignoring invalid distribution -ypy (c:\\users\\laveneau\\appdata\\roaming\\python\\python39\\site-packages)\n", "WARNING: Ignoring invalid distribution -ypy (c:\\users\\laveneau\\appdata\\roaming\\python\\python39\\site-packages)\n", "WARNING: Ignoring invalid distribution -ypy (c:\\users\\laveneau\\appdata\\roaming\\python\\python39\\site-packages)\n" ] } ], "source": [ "!pip install numba" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run the following cell to verify that numba can use Cuda..." ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "is cuda available? yes\n" ] } ], "source": [ "%%python\n", "from numba import cuda\n", "\n", "def bool_to_str(predicate: bool) -> str:\n", " if predicate:\n", " return 'yes'\n", " return 'no'\n", "\n", "def is_cuda_available_str() -> str:\n", " return bool_to_str(cuda.is_available())\n", "\n", "print(f'is cuda available? {is_cuda_available_str()}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Another simple solution to detect and print the devices from [numba documentation](https://numba.readthedocs.io/en/stable/cuda-reference/host.html#device-management):" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Found 1 CUDA devices\n", "id 0 b'Quadro T2000 with Max-Q Design' [SUPPORTED]\n", " Compute Capability: 7.5\n", " PCI Device ID: 0\n", " PCI Bus ID: 1\n", " UUID: GPU-d0ef4cd4-a0bf-4d14-21df-26d411426265\n", " Watchdog: Enabled\n", " Compute Mode: WDDM\n", " FP32/FP64 Performance Ratio: 32\n", "Summary:\n", "\t1/1 devices are supported\n" ] } ], "source": [ "%%python\n", "from numba import cuda\n", "\n", "if not cuda.detect():\n", " raise Exception(\"we do not have cuda :-(\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Run the following cell for a short test of numba/cuda..." ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "it woks!\n" ] } ], "source": [ "%%python\n", "# This is a MAP example\n", "from numba import cuda\n", "import numpy as np \n", "\n", "\n", "@cuda.jit\n", "def increment_by_one(an_array):\n", " # Thread id in a 1D block\n", " tx = cuda.threadIdx.x\n", " # Block id in a 1D grid\n", " ty = cuda.blockIdx.x\n", " # Block width, i.e. number of threads per block\n", " bw = cuda.blockDim.x\n", " # Compute flattened index inside the array\n", " pos = tx + ty * bw\n", " if pos < an_array.size: # Check array boundaries\n", " an_array[pos] += 1\n", " \n", "\n", "if __name__ == \"__main__\":\n", " # build a big vector\n", " h_array = np.arange(1<<16)\n", " \n", " d_array = cuda.to_device(h_array)\n", " \n", " threads_per_block = 32*8 # 8 warps, 256 threads per block\n", " blocks_per_grid = (h_array.size + (threads_per_block - 1)) // threads_per_block\n", " increment_by_one[blocks_per_grid, threads_per_block](d_array)\n", " \n", " d_array.copy_to_host(h_array)\n", " \n", " for i in range(len(h_array)):\n", " assert h_array[i] == i+1, f'bad value at index {i}: {h_array[i]+1}'\n", " print('it woks!')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "From here, it is clear that some code are easy to do:\n", "- binary transform\n", "- gather\n", "- scatter\n", "- and basic reduce\n", "\n", "The first labwork (of week 2) introduces a quite simple but not generic reduce. \n", "This version suffers from many default:\n", "- the operation is hard-coded ;\n", "- this version is not optimal considering data loading...\n", "\n", "The last point needs some explanations. Actually, when a thread loads one data, it is through a vector-instruction and implies the collaboration of the 32 threads of the same warp. \n", "If all the threads of a warp needs to load data, then we have different cases:\n", "1. The threads are loading **coalescent** data (contiguous, located into a 128 bytes array or less).\n", "2. The threads are not loading **coalescent** data...\n", "\n", "For the first case, actually the 32 loads are managed together through a single vector operation. \n", "For the second case, then the SIMD processor needs (up to) 32 data load instructions. \n", "\n", "Obviously the first case if much more fast than the second! Especially when you consider the data loading processing time, which is quite huge (some hundreds of cycles!). It is very important to try to load all the data in the first configuration with coalescent loading. The same for the writing. \n", "\n", "Then, in this lab work you will revisit the REDUCE pattern, considering different optimizations regarding the data loading and the operator properties (associative, but also commutativity)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## First exercise\n", "In this first version, we restart with the more general REDUCE version given in the lectures 1 and 2.\n", "\n", "Here, the operator is associative but not commutative (for instance, matrix multiplication is such an operator!). \n", "\n", "Take a look a the implementation, especially to the kernel...\n", "\n", "Complete the kernel where needed only (TODO keyword)." ] }, { "cell_type": "code", "execution_count": 42, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "maximum seems to work\n" ] } ], "source": [ "from __future__ import annotations\n", "\n", "import numpy as np\n", "from numba.np.numpy_support import from_dtype\n", "from numba import cuda, core\n", "\n", "\n", "class BasicReduce(object):\n", " \"\"\"Create a reduction object that reduces values using a given binary\n", " function. The binary function is compiled once and cached inside this\n", " object. Keeping this object alive will prevent re-compilation.\n", " \"\"\"\n", "\n", " _cache = {}\n", "\n", " _WARP_SIZE = 32\n", " _NUM_WARPS = 32\n", "\n", " @staticmethod\n", " def _gpu_kernel_factory(fn, np_type):\n", " \"\"\"Factory of kernels for the reduction problem...\n", "\n", " This function returns a Cuda Kernel that does the reduction of some data using a given binary functor.\"\"\"\n", "\n", " reduce_op = cuda.jit(device=True)(fn)\n", "\n", " max_block_size = BasicReduce._NUM_WARPS * BasicReduce._WARP_SIZE\n", "\n", " @cuda.jit(device=True)\n", " def load_shared_memory(shared_memory, arr, null_value):\n", " global_tid = cuda.grid(1)\n", " local_tid = cuda.threadIdx.x\n", " # TODO: fill shared_memory\n", " \n", " # wait all warps\n", " cuda.syncthreads()\n", "\n", " @cuda.jit(device=True)\n", " def pointer_jumping(shared_memory, jump):\n", " local_tid = cuda.threadIdx.x\n", " right = local_tid + jump\n", " # TODO: implement the pointer jumping with synchronization\n", "\n", " def gpu_reduce_block(arr, partials, null_value):\n", " \"\"\"\n", " Per block reduction, basic version...\n", " \"\"\"\n", " # move data to shared memory\n", " shared_memory = cuda.shared.array(shape=max_block_size, dtype=np_type)\n", " load_shared_memory(shared_memory, arr, null_value)\n", "\n", " # do the jumping...\n", " blk_size = cuda.blockDim.x\n", " jump = 1\n", " while jump < blk_size:\n", " pointer_jumping(shared_memory, jump)\n", " jump = jump * 2\n", "\n", " # now stores the result\n", " if cuda.threadIdx.x == 0:\n", " blk_id = cuda.blockIdx.x\n", " partials[blk_id] = shared_memory[0]\n", "\n", " return cuda.jit(gpu_reduce_block)\n", "\n", " def __init__(self, functor):\n", " \"\"\"\n", " :param functor: A function implementing a binary operation for\n", " reduction. It will be compiled as a CUDA device\n", " function using ``cuda.jit(device=True)``.\n", " \"\"\"\n", " self._functor = functor\n", "\n", " def _compile(self, dtype):\n", " key = self._functor, dtype\n", " if key not in self._cache:\n", " self._cache[key] = BasicReduce._gpu_kernel_factory(self._functor, from_dtype(dtype))\n", " return self._cache[key]\n", "\n", " def __call__(self, arr, null_value, res=None, stream=cuda.default_stream()):\n", " \"\"\"Performs a full reduction.\n", "\n", " :param arr: A host or device array.\n", " :param null_value: zero value for array.dtype\n", " :param res: Device array into which to write the reduction\n", " result to. The result is written into the first element of\n", " this array if this array is provided.\n", " :param stream: Optional CUDA stream in which to perform the reduction.\n", " If no stream is specified, the default stream of 0 is\n", " used.\n", " \"\"\"\n", "\n", " # ensure 1d array\n", " if arr.ndim != 1:\n", " raise TypeError(\"only support 1D array\")\n", "\n", " # ensure size > 0\n", " if arr.size < 1:\n", " raise ValueError(\"array's length is 0\")\n", "\n", " _sav, core.config.CUDA_LOW_OCCUPANCY_WARNINGS = core.config.CUDA_LOW_OCCUPANCY_WARNINGS, False\n", "\n", " kernel = self._compile(arr.dtype)\n", "\n", " # Perform the reduction on the GPU\n", " start_event = cuda.event(True)\n", " start_event.record(stream=stream)\n", " nb_threads = self._NUM_WARPS * self._WARP_SIZE\n", "\n", " while True:\n", " if nb_threads > arr.size:\n", " nb_threads = arr.size\n", "\n", " nb_blocks = (arr.size + nb_threads - 1) // nb_threads\n", "\n", " temp = cuda.device_array(shape=nb_blocks, dtype=arr.dtype, stream=stream)\n", "\n", " kernel[nb_blocks, nb_threads, stream](arr, temp, null_value)\n", " print(f\"launch {nb_blocks} for {nb_threads * nb_blocks} threads\")\n", " cuda.synchronize()\n", " arr = temp\n", " if nb_blocks == 1:\n", " break\n", "\n", " stop_event = cuda.event(True)\n", " stop_event.record(stream=stream)\n", " stop_event.synchronize()\n", " ct = cuda.event_elapsed_time(start_event, stop_event)\n", " print(f\"kernel computation time is {ct} ms\")\n", "\n", " # handle return value\n", " if res is not None:\n", " res.copy_to_device(arr)\n", "\n", " core.config.CUDA_LOW_OCCUPANCY_WARNINGS = _sav\n", "\n", " return arr.copy_to_host(stream=stream)[0]\n", "\n", "\n", "if __name__ == \"__main__\":\n", " def check(expected, result):\n", " if expected == result:\n", " print(f'- seems to work {result}')\n", " else:\n", " print(f'- seems to not work {result}')\n", "\n", " def test_int32(size):\n", " reducer = BasicReduce(lambda a,b: a+b)\n", " h_array = np.ones(size, dtype=np.int32)\n", " result = reducer(cuda.to_device(h_array), np.int32(0))\n", " check(h_array.size, result)\n", "\n", " def test_float32(size):\n", " reducer = BasicReduce(lambda a,b: a+b)\n", " h_array = np.ones(size, dtype=np.float32)\n", " result = reducer(cuda.to_device(h_array), np.float32(0.0))\n", " check(float(h_array.size), result)\n", "\n", " test_int32(1 << 28)\n", " test_float32(1 << 28)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Second exercise\n", "One big problem with the previous implementation is linked to the **register pressure**. \n", "\n", "A block of threads runs onto a SMP, with others blocks. \n", "But a SMP has a limited number of registers, to partition to all the threads running into blocks. \n", "\n", "When a program uses too much threads, then the number of blocks running onto a single SMP is reduced... That is a limit to the speed-up since, at the same time, using less blocks implies less threads and so longer computation time. \n", "\n", "Using shared memory impacts directly on the number of registers, since the shared memory uses SMP registers... It is then important to reduce the size of the shared memory, and so in our reduce algorithm the size of the blocks. \n", "\n", "Try the same code but with 256 warps only. Notice the computation times..." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from __future__ import annotations\n", "\n", "import numpy as np\n", "from numba.np.numpy_support import from_dtype\n", "from numba import cuda, core\n", "\n", "\n", "class BasicReduce(object):\n", " \"\"\"Create a reduction object that reduces values using a given binary\n", " function. The binary function is compiled once and cached inside this\n", " object. Keeping this object alive will prevent re-compilation.\n", " \"\"\"\n", "\n", " _cache = {}\n", "\n", " _WARP_SIZE = 32\n", " _NUM_WARPS = 32 # TODO: modify this value...\n", "\n", " @staticmethod\n", " def _gpu_kernel_factory(fn, np_type):\n", " \"\"\"Factory of kernels for the reduction problem...\n", "\n", " This function returns a Cuda Kernel that does the reduction of some data using a given binary functor.\"\"\"\n", "\n", " reduce_op = cuda.jit(device=True)(fn)\n", "\n", " max_block_size = BasicReduce._NUM_WARPS * BasicReduce._WARP_SIZE\n", "\n", " @cuda.jit(device=True)\n", " def load_shared_memory(shared_memory, arr, null_value):\n", " global_tid = cuda.grid(1)\n", " local_tid = cuda.threadIdx.x\n", " # TODO: fill shared_memory\n", " \n", " # wait all warps\n", " cuda.syncthreads()\n", "\n", " @cuda.jit(device=True)\n", " def pointer_jumping(shared_memory, jump):\n", " local_tid = cuda.threadIdx.x\n", " right = local_tid + jump\n", " # TODO: implement the pointer jumping with synchronization\n", "\n", " def gpu_reduce_block(arr, partials, null_value):\n", " \"\"\"\n", " Per block reduction, basic version...\n", " \"\"\"\n", " # move data to shared memory\n", " shared_memory = cuda.shared.array(shape=max_block_size, dtype=np_type)\n", " load_shared_memory(shared_memory, arr, null_value)\n", "\n", " # do the jumping...\n", " blk_size = cuda.blockDim.x\n", " jump = 1\n", " while jump < blk_size:\n", " pointer_jumping(shared_memory, jump)\n", " jump = jump * 2\n", "\n", " # now stores the result\n", " if cuda.threadIdx.x == 0:\n", " blk_id = cuda.blockIdx.x\n", " partials[blk_id] = shared_memory[0]\n", "\n", " return cuda.jit(gpu_reduce_block)\n", "\n", " def __init__(self, functor):\n", " \"\"\"\n", " :param functor: A function implementing a binary operation for\n", " reduction. It will be compiled as a CUDA device\n", " function using ``cuda.jit(device=True)``.\n", " \"\"\"\n", " self._functor = functor\n", "\n", " def _compile(self, dtype):\n", " key = self._functor, dtype\n", " if key not in self._cache:\n", " self._cache[key] = BasicReduce._gpu_kernel_factory(self._functor, from_dtype(dtype))\n", " return self._cache[key]\n", "\n", " def __call__(self, arr, null_value, res=None, stream=cuda.default_stream()):\n", " \"\"\"Performs a full reduction.\n", "\n", " :param arr: A host or device array.\n", " :param null_value: zero value for array.dtype\n", " :param res: Device array into which to write the reduction\n", " result to. The result is written into the first element of\n", " this array if this array is provided.\n", " :param stream: Optional CUDA stream in which to perform the reduction.\n", " If no stream is specified, the default stream of 0 is\n", " used.\n", " \"\"\"\n", "\n", " # ensure 1d array\n", " if arr.ndim != 1:\n", " raise TypeError(\"only support 1D array\")\n", "\n", " # ensure size > 0\n", " if arr.size < 1:\n", " raise ValueError(\"array's length is 0\")\n", "\n", " _sav, core.config.CUDA_LOW_OCCUPANCY_WARNINGS = core.config.CUDA_LOW_OCCUPANCY_WARNINGS, False\n", "\n", " kernel = self._compile(arr.dtype)\n", "\n", " # Perform the reduction on the GPU\n", " start_event = cuda.event(True)\n", " start_event.record(stream=stream)\n", " nb_threads = self._NUM_WARPS * self._WARP_SIZE\n", "\n", " while True:\n", " if nb_threads > arr.size:\n", " nb_threads = arr.size\n", "\n", " nb_blocks = (arr.size + nb_threads - 1) // nb_threads\n", "\n", " temp = cuda.device_array(shape=nb_blocks, dtype=arr.dtype, stream=stream)\n", "\n", " kernel[nb_blocks, nb_threads, stream](arr, temp, null_value)\n", " print(f\"launch {nb_blocks} for {nb_threads * nb_blocks} threads\")\n", " cuda.synchronize()\n", " arr = temp\n", " if nb_blocks == 1:\n", " break\n", "\n", " stop_event = cuda.event(True)\n", " stop_event.record(stream=stream)\n", " stop_event.synchronize()\n", " ct = cuda.event_elapsed_time(start_event, stop_event)\n", " print(f\"kernel computation time is {ct} ms\")\n", "\n", " # handle return value\n", " if res is not None:\n", " res.copy_to_device(arr)\n", "\n", " core.config.CUDA_LOW_OCCUPANCY_WARNINGS = _sav\n", "\n", " return arr.copy_to_host(stream=stream)[0]\n", "\n", "\n", "if __name__ == \"__main__\":\n", " def check(expected, result):\n", " if expected == result:\n", " print(f'- seems to work {result}')\n", " else:\n", " print(f'- seems to not work {result}')\n", "\n", " def test_int32(size):\n", " reducer = BasicReduce(lambda a,b: a+b)\n", " h_array = np.ones(size, dtype=np.int32)\n", " result = reducer(cuda.to_device(h_array), np.int32(0))\n", " check(h_array.size, result)\n", "\n", " def test_float32(size):\n", " reducer = BasicReduce(lambda a,b: a+b)\n", " h_array = np.ones(size, dtype=np.float32)\n", " result = reducer(cuda.to_device(h_array), np.float32(0.0))\n", " check(float(h_array.size), result)\n", "\n", " test_int32(1 << 28)\n", " test_float32(1 << 28)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Third exercise\n", "The objective here is to optimize the former reduce.\n", "We considerer here that the binary operation is associative but not *commutative*.\n", "So, we cannot permute the data...\n", "\n", "The simpler optimization consists to do:\n", "- a reduce per warp,\n", "- write the per warp result at position warp_id (so `threadIdx.x // 32`),\n", "- conclude by a last reduce per warp for warp 0,\n", "- and save the per block result. \n", "\n", "Implement this strategy and note the speed-up..." ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "maximum seems to work\n" ] } ], "source": [ "from __future__ import annotations\n", "\n", "import numpy as np\n", "from numba.np.numpy_support import from_dtype\n", "from numba import cuda, core\n", "\n", "\n", "class AssociativeReduce(object):\n", " \"\"\"Create a reduction object that reduces values using a given binary\n", " function. The binary function is compiled once and cached inside this\n", " object. Keeping this object alive will prevent re-compilation.\n", " \"\"\"\n", "\n", " _cache = {}\n", "\n", " _WARP_SIZE = 32\n", " _NUM_WARPS = 8\n", "\n", " @staticmethod\n", " def _gpu_kernel_factory(fn, np_type):\n", " \"\"\"Factory of kernels for the reduction problem...\n", "\n", " This function returns a Cuda Kernel that does the reduction of some data using a given binary functor.\"\"\"\n", "\n", " reduce_op = cuda.jit(device=True)(fn)\n", "\n", " max_block_size = AssociativeReduce._NUM_WARPS * AssociativeReduce._WARP_SIZE\n", "\n", " @cuda.jit(device=True)\n", " def load_shared_memory(shared_memory, arr, null_value):\n", " global_tid = cuda.grid(1)\n", " local_tid = cuda.threadIdx.x\n", " # TODO: same as first exercise\n", "\n", " # wait all warps\n", " cuda.syncthreads()\n", "\n", " @cuda.jit(device=True)\n", " def reduce_per_warp(shared_memory):\n", " warp_id = cuda.threadIdx.x // 32\n", " in_warp_id = cuda.threadIdx.x % 32\n", " # TODO: implement the pointer jumping without synchronization\n", "\n", " def gpu_reduce_block(arr, partials, null_value):\n", " \"\"\"\n", " Per block reduction, basic version...\n", " \"\"\"\n", " # move data to shared memory\n", " shared_memory = cuda.shared.array(shape=max_block_size, dtype=np_type)\n", " load_shared_memory(shared_memory, arr, null_value)\n", "\n", " # TODO: implements the logics\n", "\n", " # now stores the result\n", " if cuda.threadIdx.x == 0:\n", " blk_id = cuda.blockIdx.x\n", " partials[blk_id] = shared_memory[0]\n", "\n", " return cuda.jit(gpu_reduce_block)\n", "\n", " def __init__(self, functor):\n", " \"\"\"\n", " :param functor: A function implementing a binary operation for\n", " reduction. It will be compiled as a CUDA device\n", " function using ``cuda.jit(device=True)``.\n", " \"\"\"\n", " self._functor = functor\n", "\n", " def _compile(self, dtype):\n", " key = self._functor, dtype\n", " if key not in self._cache:\n", " self._cache[key] = AssociativeReduce._gpu_kernel_factory(self._functor, from_dtype(dtype))\n", " return self._cache[key]\n", "\n", " def __call__(self, arr, null_value, res=None, stream=cuda.default_stream()):\n", " \"\"\"Performs a full reduction.\n", "\n", " :param arr: A host or device array.\n", " :param null_value: zero value for array.dtype\n", " :param res: Device array into which to write the reduction\n", " result to. The result is written into the first element of\n", " this array if this array is provided.\n", " :param stream: Optional CUDA stream in which to perform the reduction.\n", " If no stream is specified, the default stream of 0 is\n", " used.\n", " \"\"\"\n", "\n", " # ensure 1d array\n", " if arr.ndim != 1:\n", " raise TypeError(\"only support 1D array\")\n", "\n", " # ensure size > 0\n", " if arr.size < 1:\n", " raise ValueError(\"array's length is 0\")\n", "\n", " _sav, core.config.CUDA_LOW_OCCUPANCY_WARNINGS = core.config.CUDA_LOW_OCCUPANCY_WARNINGS, False\n", "\n", " kernel = self._compile(arr.dtype)\n", "\n", " # Perform the reduction on the GPU\n", " start_event = cuda.event(True)\n", " start_event.record(stream=stream)\n", " nb_threads = self._NUM_WARPS * self._WARP_SIZE\n", "\n", " while True:\n", " if nb_threads > arr.size:\n", " nb_threads = arr.size\n", "\n", " nb_blocks = (arr.size + nb_threads - 1) // nb_threads\n", "\n", " temp = cuda.device_array(shape=nb_blocks, dtype=arr.dtype, stream=stream)\n", "\n", " kernel[nb_blocks, nb_threads, stream](arr, temp, null_value)\n", " print(f\"launch {nb_blocks} for {nb_threads * nb_blocks} threads\")\n", " cuda.synchronize()\n", " arr = temp\n", " if nb_blocks == 1:\n", " break\n", "\n", " stop_event = cuda.event(True)\n", " stop_event.record(stream=stream)\n", " stop_event.synchronize()\n", " ct = cuda.event_elapsed_time(start_event, stop_event)\n", " print(f\"kernel computation time is {ct} ms\")\n", "\n", " # handle return value\n", " if res is not None:\n", " res.copy_to_device(arr)\n", "\n", " core.config.CUDA_LOW_OCCUPANCY_WARNINGS = _sav\n", "\n", " return arr.copy_to_host(stream=stream)[0]\n", "\n", "\n", "if __name__ == \"__main__\":\n", " def check(expected, result):\n", " if expected == result:\n", " print(f'- seems to work {result}')\n", " else:\n", " print(f'- seems to not work {result}')\n", "\n", "\n", " def test_int32(size):\n", " reducer = AssociativeReduce(lambda a, b: a + b)\n", " h_array = np.ones(size, dtype=np.int32)\n", " result = reducer(cuda.to_device(h_array), np.int32(0))\n", " check(h_array.size, result)\n", "\n", "\n", " def test_float32(size):\n", " reducer = AssociativeReduce(lambda a, b: a + b)\n", " h_array = np.ones(size, dtype=np.float32)\n", " result = reducer(cuda.to_device(h_array), np.float32(0.0))\n", " check(float(h_array.size), result)\n", "\n", "\n", " test_int32(1 << 28)\n", " test_float32(1 << 28)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Fourth exercise\n", "Now, let us consider what happens with a *commutative* binary operation. \n", "In such a case, it is possible to reduce the number of Cuda blocks to *the number of threads per block* at most, and so to limit the number of recursive calls (at most 2 calls, the last with a single block). \n", "\n", "For this, each thread should:\n", "- do a local reduce in a private register with a modulo strategy, \n", "- and then terminate as in second exercise. \n", "\n", "Implements this strategy in the following cell. " ] }, { "cell_type": "code", "execution_count": 44, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "launch 65536 for 16777216 threads\n", "launch 256 for 65536 threads\n", "Maximum is 1048575\n", "maximum seems to work\n" ] } ], "source": [ "from __future__ import annotations\n", "\n", "import numpy as np\n", "from numba.np.numpy_support import from_dtype\n", "from numba import cuda, core\n", "\n", "\n", "class CommutativeReduce(object):\n", " \"\"\"Create a reduction object that reduces values using a given binary\n", " function. The binary function is compiled once and cached inside this\n", " object. Keeping this object alive will prevent re-compilation.\n", " \"\"\"\n", "\n", " _cache = {}\n", "\n", " _WARP_SIZE = 32\n", " _NUM_WARPS = 8\n", "\n", " @staticmethod\n", " def _gpu_kernel_factory(fn, np_type):\n", " \"\"\"Factory of kernels for the reduction problem...\n", "\n", " This function returns a Cuda Kernel that does the reduction of some data using a given binary functor.\"\"\"\n", "\n", " reduce_op = cuda.jit(device=True)(fn)\n", "\n", " max_block_size = CommutativeReduce._NUM_WARPS * CommutativeReduce._WARP_SIZE\n", "\n", " @cuda.jit(device=True)\n", " def load_shared_memory(shared_memory, arr, null_value):\n", " global_tid = cuda.grid(1)\n", " local_tid = cuda.threadIdx.x\n", " # TODO: load with a modulo scheme\n", "\n", " # wait all warps\n", " cuda.syncthreads()\n", "\n", " @cuda.jit(device=True)\n", " def reduce_per_warp(shared_memory):\n", " warp_id = cuda.threadIdx.x // 32\n", " in_warp_id = cuda.threadIdx.x % 32\n", " # TODO: implement the pointer jumping without synchronization\n", "\n", " def gpu_reduce_block(arr, partials, null_value):\n", " \"\"\"\n", " Per block reduction, basic version...\n", " \"\"\"\n", " # move data to shared memory\n", " shared_memory = cuda.shared.array(shape=max_block_size, dtype=np_type)\n", " load_shared_memory(shared_memory, arr, null_value)\n", "\n", " # TODO: implements the logics\n", "\n", " # now stores the result\n", " if cuda.threadIdx.x == 0:\n", " blk_id = cuda.blockIdx.x\n", " partials[blk_id] = shared_memory[0]\n", "\n", " return cuda.jit(gpu_reduce_block)\n", "\n", " def __init__(self, functor):\n", " \"\"\"\n", " :param functor: A function implementing a binary operation for\n", " reduction. It will be compiled as a CUDA device\n", " function using ``cuda.jit(device=True)``.\n", " \"\"\"\n", " self._functor = functor\n", "\n", " def _compile(self, dtype):\n", " key = self._functor, dtype\n", " if key not in self._cache:\n", " self._cache[key] = CommutativeReduce._gpu_kernel_factory(self._functor, from_dtype(dtype))\n", " return self._cache[key]\n", "\n", " def __call__(self, arr, null_value, res=None, stream=cuda.default_stream()):\n", " \"\"\"Performs a full reduction.\n", "\n", " :param arr: A host or device array.\n", " :param null_value: zero value for array.dtype\n", " :param res: Device array into which to write the reduction\n", " result to. The result is written into the first element of\n", " this array if this array is provided.\n", " :param stream: Optional CUDA stream in which to perform the reduction.\n", " If no stream is specified, the default stream of 0 is\n", " used.\n", " \"\"\"\n", "\n", " # ensure 1d array\n", " if arr.ndim != 1:\n", " raise TypeError(\"only support 1D array\")\n", "\n", " # ensure size > 0\n", " if arr.size < 1:\n", " raise ValueError(\"array's length is 0\")\n", "\n", " _sav, core.config.CUDA_LOW_OCCUPANCY_WARNINGS = core.config.CUDA_LOW_OCCUPANCY_WARNINGS, False\n", "\n", " kernel = self._compile(arr.dtype)\n", "\n", " # Perform the reduction on the GPU\n", " start_event = cuda.event(True)\n", " start_event.record(stream=stream)\n", " nb_threads = self._NUM_WARPS * self._WARP_SIZE\n", "\n", " while True:\n", " if nb_threads > arr.size:\n", " nb_threads = arr.size\n", "\n", " # TODO implements the logics\n", "\n", " stop_event = cuda.event(True)\n", " stop_event.record(stream=stream)\n", " stop_event.synchronize()\n", " ct = cuda.event_elapsed_time(start_event, stop_event)\n", " print(f\"kernel computation time is {ct} ms\")\n", "\n", " # handle return value\n", " if res is not None:\n", " res.copy_to_device(arr)\n", "\n", " core.config.CUDA_LOW_OCCUPANCY_WARNINGS = _sav\n", "\n", " return arr.copy_to_host(stream=stream)[0]\n", "\n", "\n", "if __name__ == \"__main__\":\n", " def check(expected, result):\n", " if expected == result:\n", " print(f'- seems to work {result}')\n", " else:\n", " print(f'- seems to not work {result}')\n", "\n", "\n", " def test_int32(size):\n", " reducer = CommutativeReduce(lambda a, b: a + b)\n", " h_array = np.ones(size, dtype=np.int32)\n", " result = reducer(cuda.to_device(h_array), np.int32(0))\n", " check(h_array.size, result)\n", "\n", "\n", " def test_float32(size):\n", " reducer = CommutativeReduce(lambda a, b: a + b)\n", " h_array = np.ones(size, dtype=np.float32)\n", " result = reducer(cuda.to_device(h_array), np.float32(0.0))\n", " check(float(h_array.size), result)\n", "\n", "\n", " test_int32(1 << 28)\n", " test_float32(1 << 28)\n" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3.9.7 64-bit", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.7" }, "orig_nbformat": 4, "vscode": { "interpreter": { "hash": "9b4d75ac280b6c7c3aa43866cb82dc88915409b55fec83a093dd0284cb58708e" } } }, "nbformat": 4, "nbformat_minor": 2 }