Justin Lei

Justin Lei

Study forever

18 Oct 2021

CS110L Week 6


实现 parallel_map,使用多线程在向量上执行函数 F,返回所有执行结果

week6 实验主页在此

设计思路

分为主线程和工作线程两部分,主线程负责将输入的元素通过通道发送给工作线程,工作线程需要将执行结果返回给主线程,主线程负责将结果放入结果向量中,因此需要两个 channel。

由于需要将每个元素的执行结果按照原有的输入顺序返回,在给工作线程传递参数的时候需要将元素在向量中的位置一并发送。

注意 crossbeam_channel 在一端所有引用断开连接之后,另一段的 send/recv 才会跳出循环,因此注意找到所有的引用。在此处实现中,工作线程的输出端在主线程也会有一个引用,应该在所有工作线程创建完成后将该引用 drop 掉。

代码

fn parallel_map<T, U, F>(mut input_vec: Vec<T>, num_threads: usize, f: F) -> Vec<U>
where
    F: FnOnce(T) -> U + Send + Copy + 'static,
    T: Send + 'static,
    U: Send + 'static + Default,
{
    let mut output_vec: Vec<U> = Vec::with_capacity(input_vec.len());
    // TODO: implement parallel map!
    let (input_tx, input_rx) = crossbeam_channel::unbounded::<Data<T>>();
    let (output_tx, output_rx) = crossbeam_channel::unbounded::<Data<U>>();
    let mut threads = Vec::new();

    for _ in 0..num_threads {
        let input_rx = input_rx.clone();
        let output_tx = output_tx.clone();
        threads.push(
            thread::spawn(move || {
                while let Ok(received) = input_rx.recv() {
                    let output: Data<U> = Data { data: f(received.data), index: received.index };
                    output_tx.send(output).unwrap();
                }
                drop(output_tx);
            })
        );
    }

    drop(output_tx);

    for (index, data) in input_vec.into_iter().enumerate() {
        input_tx.send(Data { data, index }).unwrap();
    }

    drop(input_tx);
    
    while let Ok(received) = output_rx.recv() {
        if output_vec.len() <= received.index {
            let len = output_vec.len();
            for _ in 0..(received.index - len + 1) {
                output_vec.push(U::default());
            }
        }
        output_vec[received.index] = received.data;
    }

    for handle in threads {
        handle.join().expect("Panic occurs in a thread!");
    }

    output_vec
}