# Labwork 2

Run the following cell to ensure numba is installed (skip if you are sure)

In [1]:
!pip install numba

Defaulting to user installation because normal site-packages is not writeable




Run the following cell to verify that numba can use Cuda...

In [2]:
%%python
from numba import cuda

def bool_to_str(predicate: bool) -> str:
 if predicate:
 return 'yes'
 return 'no'

def is_cuda_available_str() -> str:
 return bool_to_str(cuda.is_available())

print(f'is cuda available? {is_cuda_available_str()}')

is cuda available? yes


Another simple solution to detect and print the devices from [numba documentation](https://numba.readthedocs.io/en/stable/cuda-reference/host.html#device-management):

In [3]:
%%python
from numba import cuda

if not cuda.detect():
 raise Exception("we do not have cuda :-(")

Found 1 CUDA devices
id 0 b'Quadro T2000 with Max-Q Design' [SUPPORTED]
 Compute Capability: 7.5
 PCI Device ID: 0
 PCI Bus ID: 1
 UUID: GPU-d0ef4cd4-a0bf-4d14-21df-26d411426265
 Watchdog: Enabled
 Compute Mode: WDDM
 FP32/FP64 Performance Ratio: 32
Summary:
	1/1 devices are supported


Run the following cell for a short test of numba/cuda...

In [4]:
%%python
# This is a MAP example
from numba import cuda
import numpy as np 


@cuda.jit
def increment_by_one(an_array):
 # Thread id in a 1D block
 tx = cuda.threadIdx.x
 # Block id in a 1D grid
 ty = cuda.blockIdx.x
 # Block width, i.e. number of threads per block
 bw = cuda.blockDim.x
 # Compute flattened index inside the array
 pos = tx + ty * bw
 if pos < an_array.size: # Check array boundaries
 an_array[pos] += 1
 

if __name__ == "__main__":
 # build a big vector
 h_array = np.arange(1<<16)
 
 d_array = cuda.to_device(h_array)
 
 threads_per_block = 32*8 # 8 warps, 256 threads per block
 blocks_per_grid = (h_array.size + (threads_per_block - 1)) // threads_per_block
 increment_by_one[blocks_per_grid, threads_per_block](d_array)
 
 d_array.copy_to_host(h_array)
 
 for i in range(len(h_array)):
 assert h_array[i] == i+1, f'bad value at index {i}: {h_array[i]+1}'
 print('it woks!')

it woks!


From here, it is clear that some code are easy to do:
- binary transform
- gather
- scatter
- and basic reduce

The first labwork (of week 2) introduces a quite simple but not generic reduce. 
This version suffers from many default:
- the operation is hard-coded ;
- this version is not optimal considering data loading...

The last point needs some explanations. Actually, when a thread loads one data, it is through a vector-instruction and implies the collaboration of the 32 threads of the same warp. 
If all the threads of a warp needs to load data, then we have different cases:
1. The threads are loading **coalescent** data (contiguous, located into a 128 bytes array or less).
2. The threads are not loading **coalescent** data...

For the first case, actually the 32 loads are managed together through a single vector operation. 
For the second case, then the SIMD processor needs (up to) 32 data load instructions. 

Obviously the first case if much more fast than the second! Especially when you consider the data loading processing time, which is quite huge (some hundreds of cycles!). It is very important to try to load all the data in the first configuration with coalescent loading. The same for the writing. 

Then, in this lab work you will revisit the REDUCE pattern, considering different optimizations regarding the data loading and the operator properties (associative, but also commutativity).

## First exercise
In this first version, we restart with the more general REDUCE version given in the lectures 1 and 2.

Here, the operator is associative but not commutative (for instance, matrix multiplication is such an operator!). 

Take a look a the implementation, especially to the kernel...

Complete the kernel where needed only (TODO keyword).

In [42]:
from __future__ import annotations

import numpy as np
from numba.np.numpy_support import from_dtype
from numba import cuda, core


class BasicReduce(object):
 """Create a reduction object that reduces values using a given binary
 function. The binary function is compiled once and cached inside this
 object. Keeping this object alive will prevent re-compilation.
 """

 _cache = {}

 _WARP_SIZE = 32
 _NUM_WARPS = 32

 @staticmethod
 def _gpu_kernel_factory(fn, np_type):
 """Factory of kernels for the reduction problem...

 This function returns a Cuda Kernel that does the reduction of some data using a given binary functor."""

 reduce_op = cuda.jit(device=True)(fn)

 max_block_size = BasicReduce._NUM_WARPS * BasicReduce._WARP_SIZE

 @cuda.jit(device=True)
 def load_shared_memory(shared_memory, arr, null_value):
 global_tid = cuda.grid(1)
 local_tid = cuda.threadIdx.x
 # TODO: fill shared_memory
 
 # wait all warps
 cuda.syncthreads()

 @cuda.jit(device=True)
 def pointer_jumping(shared_memory, jump):
 local_tid = cuda.threadIdx.x
 right = local_tid + jump
 # TODO: implement the pointer jumping with synchronization

 def gpu_reduce_block(arr, partials, null_value):
 """
 Per block reduction, basic version...
 """
 # move data to shared memory
 shared_memory = cuda.shared.array(shape=max_block_size, dtype=np_type)
 load_shared_memory(shared_memory, arr, null_value)

 # do the jumping...
 blk_size = cuda.blockDim.x
 jump = 1
 while jump < blk_size:
 pointer_jumping(shared_memory, jump)
 jump = jump * 2

 # now stores the result
 if cuda.threadIdx.x == 0:
 blk_id = cuda.blockIdx.x
 partials[blk_id] = shared_memory[0]

 return cuda.jit(gpu_reduce_block)

 def __init__(self, functor):
 """
 :param functor: A function implementing a binary operation for
 reduction. It will be compiled as a CUDA device
 function using ``cuda.jit(device=True)``.
 """
 self._functor = functor

 def _compile(self, dtype):
 key = self._functor, dtype
 if key not in self._cache:
 self._cache[key] = BasicReduce._gpu_kernel_factory(self._functor, from_dtype(dtype))
 return self._cache[key]

 def __call__(self, arr, null_value, res=None, stream=cuda.default_stream()):
 """Performs a full reduction.

 :param arr: A host or device array.
 :param null_value: zero value for array.dtype
 :param res: Device array into which to write the reduction
 result to. The result is written into the first element of
 this array if this array is provided.
 :param stream: Optional CUDA stream in which to perform the reduction.
 If no stream is specified, the default stream of 0 is
 used.
 """

 # ensure 1d array
 if arr.ndim != 1:
 raise TypeError("only support 1D array")

 # ensure size > 0
 if arr.size < 1:
 raise ValueError("array's length is 0")

 _sav, core.config.CUDA_LOW_OCCUPANCY_WARNINGS = core.config.CUDA_LOW_OCCUPANCY_WARNINGS, False

 kernel = self._compile(arr.dtype)

 # Perform the reduction on the GPU
 start_event = cuda.event(True)
 start_event.record(stream=stream)
 nb_threads = self._NUM_WARPS * self._WARP_SIZE

 while True:
 if nb_threads > arr.size:
 nb_threads = arr.size

 nb_blocks = (arr.size + nb_threads - 1) // nb_threads

 temp = cuda.device_array(shape=nb_blocks, dtype=arr.dtype, stream=stream)

 kernel[nb_blocks, nb_threads, stream](arr, temp, null_value)
 print(f"launch {nb_blocks} for {nb_threads * nb_blocks} threads")
 cuda.synchronize()
 arr = temp
 if nb_blocks == 1:
 break

 stop_event = cuda.event(True)
 stop_event.record(stream=stream)
 stop_event.synchronize()
 ct = cuda.event_elapsed_time(start_event, stop_event)
 print(f"kernel computation time is {ct} ms")

 # handle return value
 if res is not None:
 res.copy_to_device(arr)

 core.config.CUDA_LOW_OCCUPANCY_WARNINGS = _sav

 return arr.copy_to_host(stream=stream)[0]


if __name__ == "__main__":
 def check(expected, result):
 if expected == result:
 print(f'- seems to work {result}')
 else:
 print(f'- seems to not work {result}')

 def test_int32(size):
 reducer = BasicReduce(lambda a,b: a+b)
 h_array = np.ones(size, dtype=np.int32)
 result = reducer(cuda.to_device(h_array), np.int32(0))
 check(h_array.size, result)

 def test_float32(size):
 reducer = BasicReduce(lambda a,b: a+b)
 h_array = np.ones(size, dtype=np.float32)
 result = reducer(cuda.to_device(h_array), np.float32(0.0))
 check(float(h_array.size), result)

 test_int32(1 << 28)
 test_float32(1 << 28)


maximum seems to work


## Second exercise
One big problem with the previous implementation is linked to the **register pressure**. 

A block of threads runs onto a SMP, with others blocks. 
But a SMP has a limited number of registers, to partition to all the threads running into blocks. 

When a program uses too much threads, then the number of blocks running onto a single SMP is reduced... That is a limit to the speed-up since, at the same time, using less blocks implies less threads and so longer computation time. 

Using shared memory impacts directly on the number of registers, since the shared memory uses SMP registers... It is then important to reduce the size of the shared memory, and so in our reduce algorithm the size of the blocks. 

Try the same code but with 256 warps only. Notice the computation times...

In [None]:
from __future__ import annotations

import numpy as np
from numba.np.numpy_support import from_dtype
from numba import cuda, core


class BasicReduce(object):
 """Create a reduction object that reduces values using a given binary
 function. The binary function is compiled once and cached inside this
 object. Keeping this object alive will prevent re-compilation.
 """

 _cache = {}

 _WARP_SIZE = 32
 _NUM_WARPS = 32 # TODO: modify this value...

 @staticmethod
 def _gpu_kernel_factory(fn, np_type):
 """Factory of kernels for the reduction problem...

 This function returns a Cuda Kernel that does the reduction of some data using a given binary functor."""

 reduce_op = cuda.jit(device=True)(fn)

 max_block_size = BasicReduce._NUM_WARPS * BasicReduce._WARP_SIZE

 @cuda.jit(device=True)
 def load_shared_memory(shared_memory, arr, null_value):
 global_tid = cuda.grid(1)
 local_tid = cuda.threadIdx.x
 # TODO: fill shared_memory
 
 # wait all warps
 cuda.syncthreads()

 @cuda.jit(device=True)
 def pointer_jumping(shared_memory, jump):
 local_tid = cuda.threadIdx.x
 right = local_tid + jump
 # TODO: implement the pointer jumping with synchronization

 def gpu_reduce_block(arr, partials, null_value):
 """
 Per block reduction, basic version...
 """
 # move data to shared memory
 shared_memory = cuda.shared.array(shape=max_block_size, dtype=np_type)
 load_shared_memory(shared_memory, arr, null_value)

 # do the jumping...
 blk_size = cuda.blockDim.x
 jump = 1
 while jump < blk_size:
 pointer_jumping(shared_memory, jump)
 jump = jump * 2

 # now stores the result
 if cuda.threadIdx.x == 0:
 blk_id = cuda.blockIdx.x
 partials[blk_id] = shared_memory[0]

 return cuda.jit(gpu_reduce_block)

 def __init__(self, functor):
 """
 :param functor: A function implementing a binary operation for
 reduction. It will be compiled as a CUDA device
 function using ``cuda.jit(device=True)``.
 """
 self._functor = functor

 def _compile(self, dtype):
 key = self._functor, dtype
 if key not in self._cache:
 self._cache[key] = BasicReduce._gpu_kernel_factory(self._functor, from_dtype(dtype))
 return self._cache[key]

 def __call__(self, arr, null_value, res=None, stream=cuda.default_stream()):
 """Performs a full reduction.

 :param arr: A host or device array.
 :param null_value: zero value for array.dtype
 :param res: Device array into which to write the reduction
 result to. The result is written into the first element of
 this array if this array is provided.
 :param stream: Optional CUDA stream in which to perform the reduction.
 If no stream is specified, the default stream of 0 is
 used.
 """

 # ensure 1d array
 if arr.ndim != 1:
 raise TypeError("only support 1D array")

 # ensure size > 0
 if arr.size < 1:
 raise ValueError("array's length is 0")

 _sav, core.config.CUDA_LOW_OCCUPANCY_WARNINGS = core.config.CUDA_LOW_OCCUPANCY_WARNINGS, False

 kernel = self._compile(arr.dtype)

 # Perform the reduction on the GPU
 start_event = cuda.event(True)
 start_event.record(stream=stream)
 nb_threads = self._NUM_WARPS * self._WARP_SIZE

 while True:
 if nb_threads > arr.size:
 nb_threads = arr.size

 nb_blocks = (arr.size + nb_threads - 1) // nb_threads

 temp = cuda.device_array(shape=nb_blocks, dtype=arr.dtype, stream=stream)

 kernel[nb_blocks, nb_threads, stream](arr, temp, null_value)
 print(f"launch {nb_blocks} for {nb_threads * nb_blocks} threads")
 cuda.synchronize()
 arr = temp
 if nb_blocks == 1:
 break

 stop_event = cuda.event(True)
 stop_event.record(stream=stream)
 stop_event.synchronize()
 ct = cuda.event_elapsed_time(start_event, stop_event)
 print(f"kernel computation time is {ct} ms")

 # handle return value
 if res is not None:
 res.copy_to_device(arr)

 core.config.CUDA_LOW_OCCUPANCY_WARNINGS = _sav

 return arr.copy_to_host(stream=stream)[0]


if __name__ == "__main__":
 def check(expected, result):
 if expected == result:
 print(f'- seems to work {result}')
 else:
 print(f'- seems to not work {result}')

 def test_int32(size):
 reducer = BasicReduce(lambda a,b: a+b)
 h_array = np.ones(size, dtype=np.int32)
 result = reducer(cuda.to_device(h_array), np.int32(0))
 check(h_array.size, result)

 def test_float32(size):
 reducer = BasicReduce(lambda a,b: a+b)
 h_array = np.ones(size, dtype=np.float32)
 result = reducer(cuda.to_device(h_array), np.float32(0.0))
 check(float(h_array.size), result)

 test_int32(1 << 28)
 test_float32(1 << 28)


## Third exercise
The objective here is to optimize the former reduce.
We considerer here that the binary operation is associative but not *commutative*.
So, we cannot permute the data...

The simpler optimization consists to do:
- a reduce per warp,
- write the per warp result at position warp_id (so `threadIdx.x // 32`),
- conclude by a last reduce per warp for warp 0,
- and save the per block result. 

Implement this strategy and note the speed-up...

In [39]:
from __future__ import annotations

import numpy as np
from numba.np.numpy_support import from_dtype
from numba import cuda, core


class AssociativeReduce(object):
 """Create a reduction object that reduces values using a given binary
 function. The binary function is compiled once and cached inside this
 object. Keeping this object alive will prevent re-compilation.
 """

 _cache = {}

 _WARP_SIZE = 32
 _NUM_WARPS = 8

 @staticmethod
 def _gpu_kernel_factory(fn, np_type):
 """Factory of kernels for the reduction problem...

 This function returns a Cuda Kernel that does the reduction of some data using a given binary functor."""

 reduce_op = cuda.jit(device=True)(fn)

 max_block_size = AssociativeReduce._NUM_WARPS * AssociativeReduce._WARP_SIZE

 @cuda.jit(device=True)
 def load_shared_memory(shared_memory, arr, null_value):
 global_tid = cuda.grid(1)
 local_tid = cuda.threadIdx.x
 # TODO: same as first exercise

 # wait all warps
 cuda.syncthreads()

 @cuda.jit(device=True)
 def reduce_per_warp(shared_memory):
 warp_id = cuda.threadIdx.x // 32
 in_warp_id = cuda.threadIdx.x % 32
 # TODO: implement the pointer jumping without synchronization

 def gpu_reduce_block(arr, partials, null_value):
 """
 Per block reduction, basic version...
 """
 # move data to shared memory
 shared_memory = cuda.shared.array(shape=max_block_size, dtype=np_type)
 load_shared_memory(shared_memory, arr, null_value)

 # TODO: implements the logics

 # now stores the result
 if cuda.threadIdx.x == 0:
 blk_id = cuda.blockIdx.x
 partials[blk_id] = shared_memory[0]

 return cuda.jit(gpu_reduce_block)

 def __init__(self, functor):
 """
 :param functor: A function implementing a binary operation for
 reduction. It will be compiled as a CUDA device
 function using ``cuda.jit(device=True)``.
 """
 self._functor = functor

 def _compile(self, dtype):
 key = self._functor, dtype
 if key not in self._cache:
 self._cache[key] = AssociativeReduce._gpu_kernel_factory(self._functor, from_dtype(dtype))
 return self._cache[key]

 def __call__(self, arr, null_value, res=None, stream=cuda.default_stream()):
 """Performs a full reduction.

 :param arr: A host or device array.
 :param null_value: zero value for array.dtype
 :param res: Device array into which to write the reduction
 result to. The result is written into the first element of
 this array if this array is provided.
 :param stream: Optional CUDA stream in which to perform the reduction.
 If no stream is specified, the default stream of 0 is
 used.
 """

 # ensure 1d array
 if arr.ndim != 1:
 raise TypeError("only support 1D array")

 # ensure size > 0
 if arr.size < 1:
 raise ValueError("array's length is 0")

 _sav, core.config.CUDA_LOW_OCCUPANCY_WARNINGS = core.config.CUDA_LOW_OCCUPANCY_WARNINGS, False

 kernel = self._compile(arr.dtype)

 # Perform the reduction on the GPU
 start_event = cuda.event(True)
 start_event.record(stream=stream)
 nb_threads = self._NUM_WARPS * self._WARP_SIZE

 while True:
 if nb_threads > arr.size:
 nb_threads = arr.size

 nb_blocks = (arr.size + nb_threads - 1) // nb_threads

 temp = cuda.device_array(shape=nb_blocks, dtype=arr.dtype, stream=stream)

 kernel[nb_blocks, nb_threads, stream](arr, temp, null_value)
 print(f"launch {nb_blocks} for {nb_threads * nb_blocks} threads")
 cuda.synchronize()
 arr = temp
 if nb_blocks == 1:
 break

 stop_event = cuda.event(True)
 stop_event.record(stream=stream)
 stop_event.synchronize()
 ct = cuda.event_elapsed_time(start_event, stop_event)
 print(f"kernel computation time is {ct} ms")

 # handle return value
 if res is not None:
 res.copy_to_device(arr)

 core.config.CUDA_LOW_OCCUPANCY_WARNINGS = _sav

 return arr.copy_to_host(stream=stream)[0]


if __name__ == "__main__":
 def check(expected, result):
 if expected == result:
 print(f'- seems to work {result}')
 else:
 print(f'- seems to not work {result}')


 def test_int32(size):
 reducer = AssociativeReduce(lambda a, b: a + b)
 h_array = np.ones(size, dtype=np.int32)
 result = reducer(cuda.to_device(h_array), np.int32(0))
 check(h_array.size, result)


 def test_float32(size):
 reducer = AssociativeReduce(lambda a, b: a + b)
 h_array = np.ones(size, dtype=np.float32)
 result = reducer(cuda.to_device(h_array), np.float32(0.0))
 check(float(h_array.size), result)


 test_int32(1 << 28)
 test_float32(1 << 28)


maximum seems to work


## Fourth exercise
Now, let us consider what happens with a *commutative* binary operation. 
In such a case, it is possible to reduce the number of Cuda blocks to *the number of threads per block* at most, and so to limit the number of recursive calls (at most 2 calls, the last with a single block). 

For this, each thread should:
- do a local reduce in a private register with a modulo strategy, 
- and then terminate as in second exercise. 

Implements this strategy in the following cell. 

In [44]:
from __future__ import annotations

import numpy as np
from numba.np.numpy_support import from_dtype
from numba import cuda, core


class CommutativeReduce(object):
 """Create a reduction object that reduces values using a given binary
 function. The binary function is compiled once and cached inside this
 object. Keeping this object alive will prevent re-compilation.
 """

 _cache = {}

 _WARP_SIZE = 32
 _NUM_WARPS = 8

 @staticmethod
 def _gpu_kernel_factory(fn, np_type):
 """Factory of kernels for the reduction problem...

 This function returns a Cuda Kernel that does the reduction of some data using a given binary functor."""

 reduce_op = cuda.jit(device=True)(fn)

 max_block_size = CommutativeReduce._NUM_WARPS * CommutativeReduce._WARP_SIZE

 @cuda.jit(device=True)
 def load_shared_memory(shared_memory, arr, null_value):
 global_tid = cuda.grid(1)
 local_tid = cuda.threadIdx.x
 # TODO: load with a modulo scheme

 # wait all warps
 cuda.syncthreads()

 @cuda.jit(device=True)
 def reduce_per_warp(shared_memory):
 warp_id = cuda.threadIdx.x // 32
 in_warp_id = cuda.threadIdx.x % 32
 # TODO: implement the pointer jumping without synchronization

 def gpu_reduce_block(arr, partials, null_value):
 """
 Per block reduction, basic version...
 """
 # move data to shared memory
 shared_memory = cuda.shared.array(shape=max_block_size, dtype=np_type)
 load_shared_memory(shared_memory, arr, null_value)

 # TODO: implements the logics

 # now stores the result
 if cuda.threadIdx.x == 0:
 blk_id = cuda.blockIdx.x
 partials[blk_id] = shared_memory[0]

 return cuda.jit(gpu_reduce_block)

 def __init__(self, functor):
 """
 :param functor: A function implementing a binary operation for
 reduction. It will be compiled as a CUDA device
 function using ``cuda.jit(device=True)``.
 """
 self._functor = functor

 def _compile(self, dtype):
 key = self._functor, dtype
 if key not in self._cache:
 self._cache[key] = CommutativeReduce._gpu_kernel_factory(self._functor, from_dtype(dtype))
 return self._cache[key]

 def __call__(self, arr, null_value, res=None, stream=cuda.default_stream()):
 """Performs a full reduction.

 :param arr: A host or device array.
 :param null_value: zero value for array.dtype
 :param res: Device array into which to write the reduction
 result to. The result is written into the first element of
 this array if this array is provided.
 :param stream: Optional CUDA stream in which to perform the reduction.
 If no stream is specified, the default stream of 0 is
 used.
 """

 # ensure 1d array
 if arr.ndim != 1:
 raise TypeError("only support 1D array")

 # ensure size > 0
 if arr.size < 1:
 raise ValueError("array's length is 0")

 _sav, core.config.CUDA_LOW_OCCUPANCY_WARNINGS = core.config.CUDA_LOW_OCCUPANCY_WARNINGS, False

 kernel = self._compile(arr.dtype)

 # Perform the reduction on the GPU
 start_event = cuda.event(True)
 start_event.record(stream=stream)
 nb_threads = self._NUM_WARPS * self._WARP_SIZE

 while True:
 if nb_threads > arr.size:
 nb_threads = arr.size

 # TODO implements the logics

 stop_event = cuda.event(True)
 stop_event.record(stream=stream)
 stop_event.synchronize()
 ct = cuda.event_elapsed_time(start_event, stop_event)
 print(f"kernel computation time is {ct} ms")

 # handle return value
 if res is not None:
 res.copy_to_device(arr)

 core.config.CUDA_LOW_OCCUPANCY_WARNINGS = _sav

 return arr.copy_to_host(stream=stream)[0]


if __name__ == "__main__":
 def check(expected, result):
 if expected == result:
 print(f'- seems to work {result}')
 else:
 print(f'- seems to not work {result}')


 def test_int32(size):
 reducer = CommutativeReduce(lambda a, b: a + b)
 h_array = np.ones(size, dtype=np.int32)
 result = reducer(cuda.to_device(h_array), np.int32(0))
 check(h_array.size, result)


 def test_float32(size):
 reducer = CommutativeReduce(lambda a, b: a + b)
 h_array = np.ones(size, dtype=np.float32)
 result = reducer(cuda.to_device(h_array), np.float32(0.0))
 check(float(h_array.size), result)


 test_int32(1 << 28)
 test_float32(1 << 28)


launch 65536 for 16777216 threads
launch 256 for 65536 threads
Maximum is 1048575
maximum seems to work
