Given an integer $n$, and 2 real sequences $\{a_1, \dots, a_n\}$ and $\{b_1, \dots, b_n\}$, with $a_i$, $b_i$ > 0, for all $i$. For a fixed $m < n$ let $\{P_1, \dots, P_m\}$ be a partition of the set $\{1, \dots, n\}$ as in $P_1 \cup \dots \cup P_m$ = $\{1, \dots, n\}$, with the $P_i$'s pairwise disjoint. I wish to find a partition of size $m$ that solves
$$\max_{P=\{P_1, ..., P_m\}}\sum_{j=1}^{m}\frac{(\sum_{i \in P_j}a_i)^2}{\sum_{i \in P_j}b_i}$$
I am really looking for an algorithm which solves the problem in polynomial time, a brute-force solution is not feasible, as it would involve the a Bell number of order $(n, k)$, with $n$ over 1e6 for realistic cases.
I would be happy to prove that the partition is monotonic in increasing values of $a/b$, in the sense that a partition expressed in the indices of the two sequences $a, b$, sorted by increasing values of $a/b$ will contain monotonic, increasing sets of integers in $1, ..., n$. I believe this is the case - can someone provide a proof?
If so, the brute-force search could be improved to an order $n \choose m-1$ algorithm, still long, but a significant savings.
The script below solves the problem by brute-force. For example, a sample run with $n = 12$, $m = 3$, gives an optimal partition of (expressed in indices of the sorted sequence $a/b$):
[[0, 1, 2, 3, 4, 5, 6, 7], [8, 9], [10, 11]] which is monotonic, as claimed.
import numpy as np import multiprocessing import concurrent.futures from functools import partial from itertools import chain, islice # n NUM_POINTS = 12 # m PARTITION_SIZE = 4 rng = np.random.RandomState(55) def knuth_partition(ns, m): def visit(n, a): ps = [[] for i in range(m)] for j in range(n): ps[a[j + 1]].append(ns[j]) return ps def f(mu, nu, sigma, n, a): if mu == 2: yield visit(n, a) else: for v in f(mu - 1, nu - 1, (mu + sigma) % 2, n, a): yield v if nu == mu + 1: a[mu] = mu - 1 yield visit(n, a) while a[nu] > 0: a[nu] = a[nu] - 1 yield visit(n, a) elif nu > mu + 1: if (mu + sigma) % 2 == 1: a[nu - 1] = mu - 1 else: a[mu] = mu - 1 if (a[nu] + sigma) % 2 == 1: for v in b(mu, nu - 1, 0, n, a): yield v else: for v in f(mu, nu - 1, 0, n, a): yield v while a[nu] > 0: a[nu] = a[nu] - 1 if (a[nu] + sigma) % 2 == 1: for v in b(mu, nu - 1, 0, n, a): yield v else: for v in f(mu, nu - 1, 0, n, a): yield v def b(mu, nu, sigma, n, a): if nu == mu + 1: while a[nu] < mu - 1: yield visit(n, a) a[nu] = a[nu] + 1 yield visit(n, a) a[mu] = 0 elif nu > mu + 1: if (a[nu] + sigma) % 2 == 1: for v in f(mu, nu - 1, 0, n, a): yield v else: for v in b(mu, nu - 1, 0, n, a): yield v while a[nu] < mu - 1: a[nu] = a[nu] + 1 if (a[nu] + sigma) % 2 == 1: for v in f(mu, nu - 1, 0, n, a): yield v else: for v in b(mu, nu - 1, 0, n, a): yield v if (mu + sigma) % 2 == 1: a[nu - 1] = 0 else: a[mu] = 0 if mu == 2: yield visit(n, a) else: for v in b(mu - 1, nu - 1, (mu + sigma) % 2, n, a): yield v n = len(ns) a = [0] * (n + 1) for j in range(1, m + 1): a[n - m + j] = j - 1 return f(m, n, 0, n, a) def Bell_n_k(n, k): ''' Number of partitions of {1,...,n} into k subsets, a restricted Bell number ''' if (n == 0 or k == 0 or k > n): return 0 if (k == 1 or k == n): return 1 return (k * Bell_n_k(n - 1, k) + Bell_n_k(n - 1, k - 1)) def slice_partitions(partitions): # Have to consume it; can't split work on generator partitions = list(partitions) num_partitions = len(partitions) bin_ends = list(range(0,num_partitions,int(num_partitions/NUM_WORKERS))) bin_ends = bin_ends + [num_partitions] if num_partitions/NUM_WORKERS else bin_ends islice_on = list(zip(bin_ends[:-1], bin_ends[1:])) rng.shuffle(partitions) slices = [list(islice(partitions, *ind)) for ind in islice_on] return slices def reduce(return_values, fn): return fn(return_values, key=lambda x: x[0]) class SimpleTask(object): def __init__(self, a, b): self.a = a self.b = b def __call__(self): time.sleep(1) return '{self.a} * {self.b} = {product}'.format(self=self, product=self.a * self.b) def __str__(self): return '{self.a} * {self.b}'.format(self=self) class Task(object): def __init__(self, a, b, partition): self.partition = partition self.task = partial(Task._task, a, b) def __call__(self): return self.task(self.partition) @staticmethod def _task(a, b, partitions, report_each=1000): max_sum = float('-inf') arg_max = -1 for ind,part in enumerate(partitions): val = 0 part_val = [0] * len(part) part_vertex = [0] * len(part) for part_ind, p in enumerate(part): part_sum = sum(a[p])**2/sum(b[p]) part_vertex[part_ind] = part_sum part_val[part_ind] = part_sum val += part_sum if val > max_sum: max_sum = val arg_max = part max_part_vertex = part_vertex # if not ind%report_each: # print('Percent complete: {:.{prec}f}'. # format(100*len(slices)*ind/num_partitions, prec=2)) return (max_sum, arg_max, max_part_vertex) class Worker(multiprocessing.Process): def __init__(self, task_queue, result_queue): multiprocessing.Process.__init__(self) self.task_queue = task_queue self.result_queue = result_queue def run(self): proc_name = self.name while True: task = self.task_queue.get() if task is None: # print('Exiting: {}'.format(proc_name)) self.task_queue.task_done() break result = task() self.task_queue.task_done() self.result_queue.put(result) NUM_WORKERS = multiprocessing.cpu_count() - 1 INT_LIST= range(0, NUM_POINTS) num_partitions = Bell_n_k(NUM_POINTS, PARTITION_SIZE) partitions = knuth_partition(INT_LIST, PARTITION_SIZE) slices = slice_partitions(partitions) while True: a0 = rng.uniform(low=-0.0, high=100.0, size=NUM_POINTS) b0 = rng.uniform(low=-0.0, high=100.0, size=NUM_POINTS) # sort by increasing a/b, to check claim ind = np.argsort(a0/b0) (a,b) = (seq[ind] for seq in (a0,b0)) tasks = multiprocessing.JoinableQueue() results = multiprocessing.Queue() workers = [Worker(tasks, results) for i in range(NUM_WORKERS)] num_slices = len(slices) # should be the same as NUM_WORKERS for worker in workers: worker.start() for i,slice in enumerate(slices): tasks.put(Task(a, b, slice)) for i in range(NUM_WORKERS): tasks.put(None) tasks.join() allResults = list() slices_left = num_slices while not results.empty(): result = results.get() allResults.append(result) # print('result: {!r}'.format(result)) slices_left -= 1 r_max = reduce(allResults, max) c = a/b part = r_max[1] endpoints = [(a[-1], b[0]) for a,b in zip(part[:-1], part[1:])] d = [(c[r]-c[l]) for l,r in endpoints] r = [(c[r]-c[l])/c[l] for l,r in endpoints] all_diffs = np.concatenate([[np.nan], np.diff(c)]) all_rets = np.concatenate([np.diff(c), [np.nan]]) / c max_diffs = sorted(all_diffs)[-(PARTITION_SIZE-1):] max_rets = sorted(all_rets)[-(PARTITION_SIZE-1):] print('TRIAL: {} : max: {:4.6f} pttion: {!r}'.format(i, *r_max[:-1])) # print('TRIAL: {} : max: {:4.6f} {!r} {!r}'.format(i, *r_max[:-1], # [float(x) # for x in ['{0:0.2f}'.format(i) # for i in r_max[2]]], prec=2)) try: assert all(np.diff(list(chain.from_iterable(r_max[1]))) == 1) except AssertionError as e: import pdb pdb.set_trace()