Problem 8.5

Average sorting

Suppose that instead of sorting an array, we just require that the elements increase on average. More precisely, we call an $n$-element array k-sorted if, for all $i = 1, 2, \ldots, n - k$, the following holds:

$$ \frac{\sum_{j=i}^{i+k-1}A[j]}{k} \le \frac{\sum_{j=i + 1}^{i+k}A[j]}{k} $$

  1. What does it mean for an array to be 1-sorted?
  2. Give a permutation of the numbers $1, 2, \ldots, 10$ that is 2-sorted, but not sorted
  3. Prove that an $n$-element array is k-sorted if and only if $A[i] \le A[i+k]$ for all $i = 1, 2, \ldots, n-k$.
  4. Give an algorithm that $k$-sorts an $n$-element array in $\O(n\lg(n/k))$ time.

We can also show a lower bound on the time to produce a $k$-sorted array, when $k$ is a constant.

  1. Show that we can sort a $k$-sorted array of length $n$ in $\O(n\lg{k})$ time. (Hint: Use the solution to [exercise 6.5-9](/06/05/09.html).)
  2. Show that when $k$ is a constant, $k$-sorting an $n$-element array requires $\Omega(n\lg{n})$ time. (Hint: Use the solution to the previous part along with the lower bound on comparison sorts.)

Intuitive description

I'm going to state an unintuitive description of what does it mean for an array to be $k$-sorted. It will be presented without proof. It will become evident later in the text.

If we write out $k$-sorted array in a grid with $k$ columns, each column will be sorted from top to bottom. This is essentially what (c) means.

Various notions

For an array to be 1-sorted it means that it is sorted in the traditional sense of the word, that is, $A[i] \le A[i+1]$ for each index $i$.

Here's a 2-sorted permutation that is not sorted: $2, 1, 4, 3, 6, 5, 8, 7, 10, 9$.

Alternative condition

Let's assume that an array is $k$-sorted. Then:

$$ \frac{\sum_{j=i}^{i+k-1}A[j]}{k} \le \frac{\sum_{j=i + 1}^{i+k}A[j]}{k} \\ \Updownarrow \\ \frac{A[i] + \sum_{j=i+1}^{i+k-1}A[j]}{k} \le \frac{\sum_{j=i+1}^{i+k-1}A[j] + A[i+k]}{k} \\ \Updownarrow \\ \frac{A[i]}{k} \le \frac{A[i+k]}{k} \\ \Updownarrow \\ A[i] \le A[i+k] $$

Note that this derivation works backwards for proving the if part.

The algorithms

To $k$-sort the array, we need to sort the $k$ columns, that is, $k$ arrays of $n/k$ elements. This is a $(n/k)\lg(n/k)$ algorithm, performed $k$ times. We can use merge-sort. We don't even need to copy the array - we can calculate the indices on the fly, although that turned less fun than expected.

To sort a $k$-sorted array, we just do what we suggested in exercise 6.5-9 - we build a min heap and every time we take an element from it, we pick another element from the column the minimal element was in. Keeping track of that in C is a bit hairy, so the implementation is gruesome. There is a new operation, MIN-HEAP-PUSH-POP, that is an improvement over first extracting the element and then inserting another one.

Lower bound on comparisons

The problem can be reduced to $k$ problems of size $(n/k)$, each with minimal number of comparisons $\Omega((n/k)\lg(n/k))$. The total is $\Omega(n\lg(n/k))$ and if $k$ is a constant, we get a very familiar $\Omega(n\lg{n})$. Really not that surprising.


C code

#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <string.h>

typedef struct {
    int value;
    int s;
} item;

typedef struct {
    item *elements;
    int length;
    int heap_size;
} heap_t;

typedef struct {
    int size;
    int k;
    int exhausted;
    int *next_indices;
} sort_state_t;

void merge_sort(int A[], int p, int r, int k, int s);
void min_heap_insert(heap_t *heap, item key);
int state_took_column(sort_state_t *state, int index);
item min_heap_push_pop(heap_t *heap, item new);
item heap_minimum(heap_t *heap);
item heap_extract_min(heap_t *heap);

/*
 * Average soting is performed by just merge-sorting each column. That was
 * easy. Modifying merge sort was hard.
 */

void k_sort(int *numbers, int size, int k) {
    for (int i = 0; i < k; i++) {
        merge_sort(numbers, 0, size, k, i);
    }
}

/*
 * Sorting a k-sorted array. We need to keep track of which column produced
 * the minumum element in the heap and this resulted in quite the tricky C
 * code. I don't think this is a good practice, but still, that's the best I'm
 * willing to make it right now.
 */

void merge_k_sorted(int *numbers, int size, int k) {
    int copy[size];

    item heap_elements[k];
    heap_t heap = {heap_elements, k, 0};

    int next_indices[k];
    sort_state_t state = {size, k, 0, next_indices};

    memcpy(copy, numbers, size * sizeof(int));

    for (int i = 0; i < k; i++) {
        item new = {copy[i], i};
        min_heap_insert(&heap, new);
        next_indices[i] = i + k;
    }

    for (int i = 0; i < size; i++) {
        item min = heap_minimum(&heap);
        numbers[i] = min.value;

        int next = state_took_column(&state, min.s);

        if (next != -1) {
            min_heap_push_pop(&heap, (item) {copy[next], next % k});
        } else {
            heap_extract_min(&heap);
        }
    }
}

int state_took_column(sort_state_t *state, int index) {
    int size = state->size,
        k = state->k,
        s = index,
        *next_indices = state->next_indices;

    if (next_indices[s] >= size) {
        while (state->exhausted < k && next_indices[state->exhausted] >= state->size) {
            state->exhausted++;
        }

        if (state->exhausted == k) {
            return -1;
        }

        int next = next_indices[state->exhausted];
        next_indices[state->exhausted] += k;
        return next;
    } else {
        next_indices[s] += k;
        return s;
    }
}

/*
 * This is the merge sort from Chapter 2, modified to look only at indices
 * congruent to k modulo s. There are two very ugly and long macroses that
 * perform this unpleasant job. There's probably a nicer way to do the
 * calculation, but modular arithmetic has always been my Achilles' heel.
 */

#define FIRST(index, k, s) ((index) + (s) - (index) % (k) + ((index) % (k) <= (s) ? 0 : (k)))
#define COUNT(a, b, k, s) (((b) - (a)) / (k) + ((((s) - (a) % (k)) + (k)) % (k) < ((b) - (a)) % (k) ? 1 : 0))

void merge(int A[], int p, int q, int r, int k, int s) {
    int i, j, l;

    int n1 = COUNT(p, q, k, s);
    int n2 = COUNT(q, r, k, s);

    int L[n1];
    int R[n2];

    for (i = FIRST(p, k, s), j = 0; i < q; j++, i += k) L[j] = A[i];
    for (i = FIRST(q, k, s), j = 0; i < r; j++, i += k) R[j] = A[i];

    for(i = 0, j = 0, l = FIRST(p, k, s); l < r; l += k) {
        if (i == n1) {
            A[l] = R[j++];
        } else if (j == n2) {
            A[l] = L[i++];
        } else if (L[i] <= R[j]) {
            A[l] = L[i++];
        } else {
            A[l] = R[j++];
        }
    }
}

void merge_sort(int A[], int p, int r, int k, int s) {
    if (COUNT(p, r, k, s) > 1) {
        int q = (p + r) / 2;
        merge_sort(A, p, q, k, s);
        merge_sort(A, q, r, k, s);
        merge(A, p, q, r, k, s);
    }
}

/*
 * Finally, the min heap from exercise 6.5-3, modified to store items instead
 * of ints. When I first wrote it, I made an error in the implementation and
 * that sent me in a hour-long debugging session. C is fun.
 *
 * Also, there is a new heap operation (min_heap_push_pop) that is a faster
 * than heap_extract_min and then min_heap_insert.
 */

#define PARENT(i) ((i - 1) / 2)
#define LEFT(i)   (2 * i + 1)
#define RIGHT(i)  (2 * i + 2)

item heap_minimum(heap_t *heap) {
    return heap->elements[0];
}

void min_heapify(heap_t *heap, int i) {
    int left  = LEFT(i),
        right = RIGHT(i),
        smallest;

    if (left < heap->heap_size && heap->elements[left].value < heap->elements[i].value) {
        smallest = left;
    } else {
        smallest = i;
    }

    if (right < heap->heap_size && heap->elements[right].value < heap->elements[smallest].value) {
        smallest = right;
    }

    if (smallest != i) {
        item tmp = heap->elements[i];
        heap->elements[i] = heap->elements[smallest];
        heap->elements[smallest] = tmp;

        min_heapify(heap, smallest);
    }
}

item heap_extract_min(heap_t *heap) {
    if (heap->heap_size == 0) {
        fprintf(stderr, "heap underflow");
        exit(0);
    }

    item min = heap->elements[0];
    heap->elements[0] = heap->elements[heap->heap_size - 1];
    heap->heap_size--;
    min_heapify(heap, 0);

    return min;
}

void heap_decrease_key(heap_t *heap, int i, item key) {
    if (key.value > heap->elements[i].value) {
        fprintf(stderr, "new key is larger than current key");
        exit(0);
    }

    heap->elements[i].value = key.value;
    while (i > 0 && heap->elements[PARENT(i)].value > heap->elements[i].value) {
        item tmp = heap->elements[PARENT(i)];
        heap->elements[PARENT(i)] = heap->elements[i];
        heap->elements[i] = tmp;
        i = PARENT(i);
    }
}

void min_heap_insert(heap_t *heap, item key) {
    if (heap->length == heap->heap_size) {
        fprintf(stderr, "heap overflow");
        exit(0);
    }

    heap->elements[heap->heap_size].value = INT_MAX;
    heap->elements[heap->heap_size].s = key.s;
    heap->heap_size++;
    heap_decrease_key(heap, heap->heap_size - 1, key);
}

item min_heap_push_pop(heap_t *heap, item new) {
    item result = heap->elements[0];
    heap->elements[0] = new;
    min_heapify(heap, 0);
    return result;
}