多线程排序问题

前几天面试碰到这样一个问题,和多线程排序相关,题目内容大概是下面这样: 从输入端会来一连串无序且不重复的数字,例如:

3, 6, 2, 4, 1, 9, 5, ...

接受端接收到这些数字之后,需要对这些数字排序,并且在适当的时候按原子递增输出,例如

接收数字    当前集合        是否输出
    3      {3}               否   
    6      {3, 6}            否    
    2      {2, 3, 6}         否
    4      {2, 3, 4, 6}      否
    1      {1, 2, 3, 4, 6}   是,输出1,2,3,4
    9      {6, 9}            否
    5      {5, 6, 9}         否,输出5,6
    ...    {9,...}           否

这个排序问题在单线程做的时候很简单,但是现在我有100个线程,我希望这100个线程能够尽可能地并发完成这个任务(假设输入输出都不是瓶颈),应该如何设计数据结构呢?

下面是我想到的解法: 首先,维护一个全局变量wait_cnt表示下一个等待的数字,以及一个线程池,维护这100个线程

std::atomic<uint> wait_cnt = 1;
std::ThreadPool<100> thread_pool;

然后是维护一系列的桶(大于100个),并且每个桶都有负责存储的数据范围,第i个桶保存的范围为[i * bucket_max_size, (i + 1) * max_size),桶内的数据是保持有序的。每个桶里有一把锁,只有持有该锁的线程才能对桶里的数据做处理。

const uint bucket_max_size = 1,000,000;
struct Bucket {
    std::mutex latch_;
    std::vector<uint> counts_;

    // 实现顺序插入算法
    void insert(uint count);

    bool exist();
    
    void remove();
}

std::unordered_map<int, Bucket *> buckets;

然后,当一个新的输入来了时候,线程会判断这个数字是否等于wait_cnt,如果不是则只需要写入数据到对应的桶里即可;如果等于,则需要输出当前数字,然后找到下一个该输出的数字输出,如果没有找到,则将wait_cnt设置为这个数字

void worker(uint count) {
    if(count != wait_cnt) {
        // 插入
        int bucket_index = count / bucket_max_size;
        auto bucket = buckets[bucket_index];
        std::scoped_lock<std::mutex> lock{bucket.latch_};
        bucket->insert(count);
    } else {
        // 输出并删除
        std::cout << count ;
        int next_count = wait_cnt + 1;
        while(true) {
            int bucket_index = next_count / bucket_max_size;
            auto bucket = buckets[bucket_index];
            std::scoped_lock<std::mutex> lock{bucket.latch_};
            while(true) {
                if(next_count / bucket_max_size == bucket_index && bucket->exist(next_count)) {
                    std::cout << " " << next_count;
                    bucket->remove(next_count);
                    next_count++;
                } else {
                    break;
                }
            }
            if(next_count / bucket_max_size == bucket_index) {
                wait_cnt = next_count;
                break;
            }
        }
    }
}