# Labwork 3
This new labwork focuses on the SCAN pattern, which is the Swiss-knife of the little parallel programmer...

The three exercises are dealing with the Inclusive SCAN version, from left to right (no reverse).

## Exercise 1
In this exercise, you must implement a **block-scan**. 
It is a scan into a single block, for a small amount of data (256 values). 
Then, you should implement the PRAM algorithm, with the correct per-warp synchronizations...

NB: you must write the three device functions:
- `load_shared_memory`
- `pointer_jumping`
- `save_shared_memory`
The kernel should be read, too...

In [None]:
from __future__ import annotations

import numpy as np
from numba.np.numpy_support import from_dtype
from numba import cuda, core


class BlockScan(object):
    """Create a scan object that scans values using a given binary
    function. The binary function is compiled once and cached inside this
    object. Keeping this object alive will prevent re-compilation.

    The scan is functional for a limited number of values, as it runs into a single block.
    THe number of values should be less than 256.
    """

    _cache = {}

    _WARP_SIZE = 32
    _NUM_WARPS = 8

    @staticmethod
    def _gpu_kernel_factory(fn, np_type):
        """Factory of kernels for the block-scan problem...

        This function returns a Cuda Kernel that does block-scan of some data using a given binary functor."""

        scan_op = cuda.jit(device=True)(fn)

        max_block_size = BlockScan._NUM_WARPS * BlockScan._WARP_SIZE

        @cuda.jit(device=True)
        def load_shared_memory(shared_memory, d_input):
            local_tid = cuda.threadIdx.x
            # TODO: load data into shared memory

        @cuda.jit(device=True)
        def pointer_jumping(shared_memory, jump):
            tid = cuda.threadIdx.x
            # TODO: implement the pointer jumping

        @cuda.jit(device=True)
        def save_shared_memory(shared_memory, d_output):
            local_tid = cuda.threadIdx.x
            # TODO: save data from shared memory

        def gpu_scan_block(d_input, d_output):
            """
            Per block SCAN...
            """
            # move data to shared memory
            shared_memory = cuda.shared.array(shape=max_block_size, dtype=np_type)
            load_shared_memory(shared_memory, d_input)

            # TODO: implements the logics (pointer jumping)

            # now stores the result
            save_shared_memory(shared_memory, d_output)

        return cuda.jit(gpu_scan_block)

    def __init__(self, functor):
        """
        :param functor: A function implementing a binary operation for
                        scan. It will be compiled as a CUDA device
                        function using ``cuda.jit(device=True)``.
        """
        self._functor = functor

    def _compile(self, dtype):
        key = self._functor, dtype
        if key not in self._cache:
            self._cache[key] = BlockScan._gpu_kernel_factory(self._functor, from_dtype(dtype))
        return self._cache[key]

    def __call__(self, d_input, d_output, verbose, stream=cuda.default_stream()):
        """ Performs a per-block SCAN.

        :param d_input: A host or device array.
        :param stream: Optional CUDA stream in which to perform the scan.
                    If no stream is specified, the default stream of 0 is used.
        """

        # ensure 1d array
        if d_input.ndim != 1:
            raise TypeError("only support 1D array")

        # ensure size > 0
        if d_input.size < 1:
            raise ValueError("array's length is 0")

        # ensure arrays' size are the same
        if d_input.size != d_output.size:
            raise ValueError("arrays' length are different ({d_input.size} / {d_output.size}")

        # ensure size < 256
        max_block_size = BlockScan._WARP_SIZE * BlockScan._NUM_WARPS
        if d_input.size < max_block_size:
            raise ValueError("array's length is greater than max block size")

        _sav, core.config.CUDA_LOW_OCCUPANCY_WARNINGS = core.config.CUDA_LOW_OCCUPANCY_WARNINGS, False

        kernel = self._compile(d_input.dtype)

        # Perform the reduction on the GPU
        start_event = cuda.event(True)
        start_event.record(stream=stream)

        nb_threads = max_block_size
        nb_blocks = 1
        kernel[nb_blocks, nb_threads, stream](d_input, d_output)

        stop_event = cuda.event(True)
        stop_event.record(stream=stream)
        stop_event.synchronize()
        ct = cuda.event_elapsed_time(start_event, stop_event)

        core.config.CUDA_LOW_OCCUPANCY_WARNINGS = _sav

        return d_output.copy_to_host(), ct


if __name__ == "__main__":
    def display(array):
        for i in range(array.size):
            print(f"{array[i]} ", end="")
            if (i % 16) == 15:
                print("")


    def check(expected, result):
        it_works = np.array_equal(expected, result)
        if it_works:
            print(f'- seems to work')
        else:
            print(f'- seems to not work, here is your result:')
            display(result)


    def test_int32(size):
        scanner = BlockScan(lambda a, b: a + b)
        h_array = np.ones(size, dtype=np.int32)
        ct=[]
        verbose = True
        for _ in range(15):
            result, time = scanner(cuda.to_device(h_array), cuda.to_device(h_array), verbose)
            verbose = False
            ct.append(time)
        print(f"minimum kernel computation time is {min(ct)} ms")
        check(np.cumsum(h_array), result)


    def test_float32(size):
        scanner = BlockScan(lambda a, b: a + b)
        h_array = np.ones(size, dtype=np.float32)
        ct=[]
        verbose = True
        for _ in range(15):
            result, time = scanner(cuda.to_device(h_array), cuda.to_device(h_array), verbose)
            verbose = False
            ct.append(time)
        print(f"minimum kernel computation time is {min(ct)} ms")
        check(np.cumsum(h_array), result)


    def test_float64(size):
        scanner = BlockScan(lambda a, b: a + b)
        h_array = np.ones(size, dtype=np.float64)
        ct=[]
        verbose = True
        for _ in range(15):
            result, time = scanner(cuda.to_device(h_array), cuda.to_device(h_array), verbose)
            verbose = False
            ct.append(time)
        print(f"minimum kernel computation time is {min(ct)} ms")
        check(np.cumsum(h_array), result)


    test_int32(1 << 8)
    test_float32(1 << 8)
    test_float64(1 << 8)


## Exercise 2
This exercise deals with a block SCAN, but here applying the **hybrid** strategy saw in lecture 3. 

The idea is then (for a single block):
- to load the data in shared memory
- to do a SCAN per warp (so without warp synchronization)
- to do a warp synchronization
- to do a SCAN considering the last value of each warp (into a single warp, so no warp synchronization)
- to do a warp synchronization
- to apply the final MAP for all warps except the first...

Try this wonderful strategy below.

NB: the computation time will be roughly speaking the same, but the idea is to apply this algorithm in simple condition first, before to do it at a larger scale.

In [None]:
from __future__ import annotations

import numpy as np
from numba.np.numpy_support import from_dtype
from numba import cuda, core


class BlockScanHybrid(object):
    """Create a scan object that scans values using a given binary
    function. The binary function is compiled once and cached inside this
    object. Keeping this object alive will prevent re-compilation.

    The scan is functional for a limited number of values, as it runs into a single block.
    THe number of values should be less than 256.
    """

    _cache = {}

    _WARP_SIZE = 32
    _NUM_WARPS = 8

    @staticmethod
    def _gpu_kernel_factory(fn, np_type):
        """Factory of kernels for the block-scan problem...

        This function returns a Cuda Kernel that does block-scan of some data using a given binary functor."""

        scan_op = cuda.jit(device=True)(fn)

        max_block_size = BlockScanHybrid._NUM_WARPS * BlockScanHybrid._WARP_SIZE
        
        @cuda.jit(device=True)
        def load_shared_memory(shared_memory, d_input):
            local_tid = cuda.threadIdx.x
            # TODO: load data into shared memory

        @cuda.jit(device=True)
        def scan_per_warp(shared_memory):
            tid = cuda.threadIdx.x
            warp_tid = tid & 31
            # TODO: per warp scan

        @cuda.jit(device=True)
        def get_extra_value(shared_memory, last):
            tid = cuda.threadIdx.x
            warp_tid = tid & 31
            # TODO: collect last value of each warp (copy into "last")

        @cuda.jit(device=True)
        def apply_final_map(shared_memory, last):
            tid = cuda.threadIdx.x
            warp_id = tid >> 5
            # TODO: apply final MAP

        @cuda.jit(device=True)
        def save_shared_memory(shared_memory, d_output):
            local_tid = cuda.threadIdx.x
            # TODO: save data from shared memory

        def gpu_scan_block(d_input, d_output):
            """
            Per block SCAN...
            """
            # move data to shared memory
            shared_memory = cuda.shared.array(shape=max_block_size, dtype=np_type)
            extra_shared_memory = cuda.shared.array(shape=32, dtype=np_type)
            load_shared_memory(shared_memory, d_input)

            # TODO: implements the logics

        return cuda.jit(gpu_scan_block)

    def __init__(self, functor):
        """
        :param functor: A function implementing a binary operation for
                        scan. It will be compiled as a CUDA device
                        function using ``cuda.jit(device=True)``.
        """
        self._functor = functor

    def _compile(self, dtype):
        key = self._functor, dtype
        if key not in self._cache:
            self._cache[key] = BlockScanHybrid._gpu_kernel_factory(self._functor, from_dtype(dtype))
        return self._cache[key]

    def __call__(self, d_input, d_output=None, verbose=False, stream=cuda.default_stream()):
        """ Performs a per-block SCAN.

        :param d_input: A host or device array.
        :param stream: Optional CUDA stream in which to perform the scan.
                    If no stream is specified, the default stream of 0 is used.
        """

        # ensure 1d array
        if d_input.ndim != 1:
            raise TypeError("only support 1D array")

        # ensure size > 0
        if d_input.size < 1:
            raise ValueError("array's length is 0")

        # ensure arrays' size are the same
        if d_input.size != d_output.size:
            raise ValueError("arrays' length are different ({d_input.size} / {d_output.size}")

        # ensure size < 256
        max_block_size = BlockScanHybrid._WARP_SIZE * BlockScanHybrid._NUM_WARPS
        if d_input.size < max_block_size:
            raise ValueError("array's length is greater than max block size")

        _sav, core.config.CUDA_LOW_OCCUPANCY_WARNINGS = core.config.CUDA_LOW_OCCUPANCY_WARNINGS, False

        kernel = self._compile(d_input.dtype)
        # turn on GPU...
        nb_threads = max_block_size
        nb_blocks = 1
        kernel[nb_blocks, nb_threads, stream](d_input, d_output)

        # Perform the reduction on the GPU
        start_event = cuda.event(True)
        start_event.record(stream=stream)

        nb_threads = max_block_size
        nb_blocks = 1
        kernel[nb_blocks, nb_threads, stream](d_input, d_output)

        stop_event = cuda.event(True)
        stop_event.record(stream=stream)
        stop_event.synchronize()
        ct = cuda.event_elapsed_time(start_event, stop_event)

        core.config.CUDA_LOW_OCCUPANCY_WARNINGS = _sav

        return d_output.copy_to_host(), ct


if __name__ == "__main__":
    def display(array):
        for i in range(array.size):
            print(f"{array[i]} ", end="")
            if (i % 16) == 15:
                print("")


    def check(expected, result):
        it_works = True
        for i in range(result.size):
            if expected[i] != result[i]:
                it_works = False
        if it_works:
            print(f'- seems to work')
        else:
            print(f'- seems to not work, here is your result:')
            display(result)


    def test_int32(size):
        scanner = BlockScanHybrid(lambda a, b: a + b)
        h_array = np.ones(size, dtype=np.int32)
        ct=[]
        verbose = True
        for _ in range(15):
            result, time = scanner(cuda.to_device(h_array), cuda.to_device(h_array), verbose)
            verbose = False
            ct.append(time)
        print(f"minimum kernel computation time is {min(ct)} ms")
        check(np.cumsum(h_array), result)


    def test_float32(size):
        scanner = BlockScanHybrid(lambda a, b: a + b)
        h_array = np.ones(size, dtype=np.float32)
        ct=[]
        verbose = True
        for _ in range(15):
            result, time = scanner(cuda.to_device(h_array), cuda.to_device(h_array), verbose)
            verbose = False
            ct.append(time)
        print(f"minimum kernel computation time is {min(ct)} ms")
        check(np.cumsum(h_array), result)


    def test_float64(size):
        scanner = BlockScanHybrid(lambda a, b: a + b)
        h_array = np.ones(size, dtype=np.float64)
        ct=[]
        verbose = True
        for _ in range(15):
            result, time = scanner(cuda.to_device(h_array), cuda.to_device(h_array), verbose)
            verbose = False
            ct.append(time)
        print(f"minimum kernel computation time is {min(ct)} ms")
        check(np.cumsum(h_array), result)


    test_int32(1 << 8)
    test_float32(1 << 8)
    test_float64(1 << 8)


## Exercise 3
In this exercise you must implement the **full scan** for a given binary associative operator.

The lecture 3 gives you all the clue for this purpose:
- the per block SCAN, with its extra value,
- the inter-block extra value scan which is a recursive call,
- the MAP to end the SCAN by adding the extra block scanned values (with shift). 

In [None]:
from __future__ import annotations

import numpy as np
from numba.np.numpy_support import from_dtype
from numba import cuda, core


def display(array, all_values=False):
    def aprint(data_range, noend=False):
        for i in data_range:
            print(f"{array[i]} ", end="")
            if (i % 16) == 15: print("")
        if not noend: print("...")

    if all_values:
        aprint(range(array.size), noend=True)
        return
    aprint(range(0, min(array.size, 16)))
    if array.size < 256 - 16: return
    aprint(range(256 - 16, min(array.size, 256 + 16)))
    if array.size < 512 - 16: return
    aprint(range(512 - 16, min(array.size, 512 + 16)))
    if array.size < 768 - 16: return
    aprint(range(768 - 16, min(array.size, 768 + 16)))
    aprint(range(array.size - 16 - 256, array.size - 256 + 16))
    aprint(range(array.size - 16, array.size), noend=True)


class InclusiveScan(object):
    """Create a scan object that scans values using a given binary
    function. The binary function is compiled once and cached inside this
    object. Keeping this object alive will prevent re-compilation.

    The scan is functional for a limited number of values, as it runs into a single block.
    THe number of values should be less than 256.
    """

    _kernels_block_scan = {}
    _kernels_block_map = {}

    _WARP_SIZE = 32
    _NUM_WARPS = 8

    @staticmethod
    def _gpu_kernel_block_scan_factory(fn, np_type):
        """Factory of kernels for the block-scan problem...

        This function returns a Cuda Kernel that does block-scan of some data using a given binary functor."""

        scan_op = cuda.jit(device=True)(fn)

        max_block_size = InclusiveScan._NUM_WARPS * InclusiveScan._WARP_SIZE

        @cuda.jit(device=True)
        def load_shared_memory(shared_memory, d_input):
            local_tid = cuda.threadIdx.x
            tid = cuda.grid(1)
            # TODO: load data into shared memory

        @cuda.jit(device=True)
        def pointer_jumping(shared_memory, jump):
            tid = cuda.threadIdx.x
            # TODO: implement the pointer jumping (full block)

        @cuda.jit(device=True)
        def save_shared_memory(shared_memory, d_output, d_extras):
            local_tid = cuda.threadIdx.x
            global_tid = cuda.grid(1)
            # TODO: save data from shared memory
            
            # TODO: save last block value to "d_extras"!

        def gpu_scan_block(d_input, d_output, d_extras):
            """
            Per block SCAN...
            """
            # move data to shared memory
            shared_memory = cuda.shared.array(shape=max_block_size, dtype=np_type)
            load_shared_memory(shared_memory, d_input)

            # TODO: implements the logics

            # now stores the result
            save_shared_memory(shared_memory, d_output, d_extras)

        return cuda.jit(gpu_scan_block)

    def __init__(self, functor):
        """
        :param functor: A function implementing a binary operation for
                        scan. It will be compiled as a CUDA device
                        function using ``cuda.jit(device=True)``.
        """
        self._functor = functor

    def _compile_block_scan(self, dtype):
        key = self._functor, dtype
        if key not in self._kernels_block_scan:
            self._kernels_block_scan[key] = InclusiveScan._gpu_kernel_block_scan_factory(self._functor,
                                                                                         from_dtype(dtype))
        return self._kernels_block_scan[key]

    @staticmethod
    def _gpu_kernel_block_map_factory(fn, np_type):
        """Factory of kernels for the block-map problem...

        This function returns a Cuda Kernel that does block-map of some data using a given binary functor."""

        scan_op = cuda.jit(device=True)(fn)

        def gpu_map_block(d_io, d_scan):
            """
            Per block MAP...
            """
            # TODO: implements the logics

        return cuda.jit(gpu_map_block)

    def _compile_block_map(self, dtype):
        key = self._functor, dtype
        if key not in self._kernels_block_map:
            self._kernels_block_map[key] = InclusiveScan._gpu_kernel_block_map_factory(self._functor, from_dtype(dtype))
        return self._kernels_block_map[key]

    def __call__(self, d_input, d_output, verbose=False, stream=cuda.default_stream()):
        """ Performs a per-block SCAN.

        :param d_input: A host or device array.
        :param stream: Optional CUDA stream in which to perform the scan.
                    If no stream is specified, the default stream of 0 is used.
        """

        # ensure 1d array
        if d_input.ndim != 1:
            raise TypeError("only support 1D array")

        # ensure size > 0
        if d_input.size < 1:
            raise ValueError("array's length is 0")

        # ensure arrays' size are the same
        if d_input.size != d_output.size:
            raise ValueError("arrays' length are different ({d_input.size} / {d_output.size}")

        _sav, core.config.CUDA_LOW_OCCUPANCY_WARNINGS = core.config.CUDA_LOW_OCCUPANCY_WARNINGS, False

        kernel_step1 = self._compile_block_scan(d_input.dtype)
        kernel_step2 = self._compile_block_map(d_input.dtype)

        max_block_size = InclusiveScan._WARP_SIZE * InclusiveScan._NUM_WARPS
        nb_threads = min(max_block_size, d_input.size)
        nb_blocks = (d_input.size + nb_threads - 1) // nb_threads
        extras = cuda.device_array(shape=nb_blocks, dtype=d_input.dtype)

        # Perform the reduction on the GPU
        start_event = cuda.event(True)
        start_event.record(stream=stream)

        kernel_step1[nb_blocks, nb_threads, stream](d_input, d_output, extras)
        if nb_blocks > 1:
            # display(extras, all=True)
            self(extras, extras, stream)
            kernel_step2[nb_blocks, nb_threads, stream](d_output, extras)

        stop_event = cuda.event(True)
        stop_event.record(stream=stream)
        stop_event.synchronize()

        core.config.CUDA_LOW_OCCUPANCY_WARNINGS = _sav

        # display(extras)

        return cuda.event_elapsed_time(start_event, stop_event)


if __name__ == "__main__":

    def check(expected, result):
        it_works = np.array_equal(expected, result)
        if it_works:
            print(f'- seems to work')
        else:
            print(f'- seems to not work, here is your result:')
            display(result)
            display(expected)


    def test_int32(size):
        scanner = InclusiveScan(lambda a, b: a + b)
        h_array = np.ones(size, dtype=np.int32)
        d_result = cuda.device_array(size, dtype=np.int32)
        ct=[]
        verbose = True
        for _ in range(15):
            time = scanner(cuda.to_device(h_array), d_result, verbose)
            verbose = False
            ct.append(time)
        print(f"minimum kernel computation time is {min(ct)} ms")
        expected = np.cumsum(h_array)
        check(expected, d_result.copy_to_host())


    def test_float32(size):
        scanner = InclusiveScan(lambda a, b: a + b)
        h_array = np.ones(size, dtype=np.float32)
        d_result = cuda.device_array(size, dtype=np.float32)
        ct=[]
        verbose = True
        for _ in range(15):
            time = scanner(cuda.to_device(h_array), d_result, verbose)
            verbose = False
            ct.append(time)
        print(f"minimum kernel computation time is {min(ct)} ms")
        expected = np.cumsum(h_array)
        check(expected, d_result.copy_to_host())


    def test_float64(size):
        scanner = InclusiveScan(lambda a, b: a + b)
        h_array = np.ones(size, dtype=np.float64)
        d_result = cuda.device_array(size, dtype=np.float64)
        ct=[]
        verbose = True
        for _ in range(15):
            time = scanner(cuda.to_device(h_array), d_result, verbose)
            verbose = False
            ct.append(time)
        print(f"minimum kernel computation time is {min(ct)} ms")
        expected = np.cumsum(h_array)
        check(expected, d_result.copy_to_host())


    test_int32(1 << 24)
    # with float32, the sequential cumsum fails with more than 2^24 number...
    test_float32(1 << 24)
    test_float64(1 << 24)
