多线程排序问题
前几天面试碰到这样一个问题,和多线程排序相关,题目内容大概是下面这样: 从输入端会来一连串无序且不重复的数字,例如:
3, 6, 2, 4, 1, 9, 5, ...
接受端接收到这些数字之后,需要对这些数字排序,并且在适当的时候按原子递增输出,例如
接收数字 当前集合 是否输出
3 {3} 否
6 {3, 6} 否
2 {2, 3, 6} 否
4 {2, 3, 4, 6} 否
1 {1, 2, 3, 4, 6} 是,输出1,2,3,4
9 {6, 9} 否
5 {5, 6, 9} 否,输出5,6
... {9,...} 否
这个排序问题在单线程做的时候很简单,但是现在我有100个线程,我希望这100个线程能够尽可能地并发完成这个任务(假设输入输出都不是瓶颈),应该如何设计数据结构呢?
下面是我想到的解法: 首先,维护一个全局变量wait_cnt表示下一个等待的数字,以及一个线程池,维护这100个线程
std::atomic<uint> wait_cnt = 1;
std::ThreadPool<100> thread_pool;
然后是维护一系列的桶(大于100个),并且每个桶都有负责存储的数据范围,第i个桶保存的范围为[i * bucket_max_size, (i + 1) * max_size),桶内的数据是保持有序的。每个桶里有一把锁,只有持有该锁的线程才能对桶里的数据做处理。
const uint bucket_max_size = 1,000,000;
struct Bucket {
std::mutex latch_;
std::vector<uint> counts_;
// 实现顺序插入算法
void insert(uint count);
bool exist();
void remove();
}
std::unordered_map<int, Bucket *> buckets;
然后,当一个新的输入来了时候,线程会判断这个数字是否等于wait_cnt
,如果不是则只需要写入数据到对应的桶里即可;如果等于,则需要输出当前数字,然后找到下一个该输出的数字输出,如果没有找到,则将wait_cnt
设置为这个数字
void worker(uint count) {
if(count != wait_cnt) {
// 插入
int bucket_index = count / bucket_max_size;
auto bucket = buckets[bucket_index];
std::scoped_lock<std::mutex> lock{bucket.latch_};
bucket->insert(count);
} else {
// 输出并删除
std::cout << count ;
int next_count = wait_cnt + 1;
while(true) {
int bucket_index = next_count / bucket_max_size;
auto bucket = buckets[bucket_index];
std::scoped_lock<std::mutex> lock{bucket.latch_};
while(true) {
if(next_count / bucket_max_size == bucket_index && bucket->exist(next_count)) {
std::cout << " " << next_count;
bucket->remove(next_count);
next_count++;
} else {
break;
}
}
if(next_count / bucket_max_size == bucket_index) {
wait_cnt = next_count;
break;
}
}
}
}