数据共享
事实上,线程之间进行数据共享是有很多种形式的,熟悉C语言编程的,应该知道的有
- 跨进程的 IPC 通信: socket , pipe, sharemem
- 进程内的共享变量访问,必须要增加锁的保护, mutex,semphore, 读写锁等等等
内核态的数据共享应该不涉及到IPC的通信(都在一个地址空间) 因此只会涉及到关于数据的原子访问 常见有
- atomic 原子变量
- spinlock保护的共享变量
- 其他锁保护的共享变量(太多了)
我们要看一下RUST 是如何定义数据共享的
多所有权模型
RC我们已经很熟悉了,RUST 提供的一个模型 就是使用多所有权模型 引用计数指针
由于Rc的引用计数更新不是原子的,因此只能适用于单线程模型,多线程模型 提供了他的孪生模型 Arc,
Arc保证了原子计数的更新是原子的
回到上一个小节的例子
use std::thread;
use std::sync::Arc;
fn main() {
let nums = Arc::new(vec![1,2,3,4]);
for n in 0..4 {
let ns = Arc::clone(&nums);
let child = thread::spawn( move || {
println!("Thread num: {}", ns[n]);
});
child.join().expect("Failed join child thread");
}
println!("main exit");
}
共享数据的保护
回忆引用计数指针的问题,我们之前讲过 如果希望修改引用计数内的类型,需要通过Refcell
use std::thread;
use std::sync::Arc;
use std::cell::RefCell;
fn main() {
let nums = Arc::new(vec![1,2,3,4]);
for n in 0..4 {
let ns = Arc::clone(&nums);
let child = thread::spawn( move || {
println!("Thread num: {}", ns.get(n).unwrap());
ns.clear();
});
child.join().expect("Failed join child thread");
}
println!("main exit");
}
我们可以看到上面代码报错了,提示无法修改Arc,请回答一下为什么
是否可以利用RefCell修改他的值? 试试看 为什么
互斥
RUST 提供了可以互斥的类型 保证共享变量的访问,当然实际上互斥类型可以有非常多种 根据锁的类型不同
我们这里简单介绍一种 Mutex
use std::sync::Mutex;
use std::thread;
fn main() {
let nums = Mutex::new(vec![1,2,3,4]);
let child = thread::spawn( move || {
println!("Thread num: {}", nums.lock().unwrap().get(0).unwrap());
});
child.join().expect("Failed join child thread");
println!("main exit");
}
Mutex的变量必须要先通过lock()方法 上锁成功以后才可以访问
共享可变性
有了Mutex的基础,再加上Arc 可以实现一个数据再多个所有权下的访问
use std::thread;
use std::sync::Arc;
use std::sync::Mutex;
fn main() {
let nums = Arc::new(Mutex::new(vec![]));
for n in 0..4 {
let ns = nums.clone();
let child = thread::spawn( move || {
let mut v = ns.lock().unwrap();// lock
v.push(n);
println!("Thread num: {:?}", v);
// v lifetime end, will unlock self
});
child.join().expect("Failed join child thread");
}
println!("main exit");
}
通过消息传递进行通信
除了之前讲过的,利用多所有权 和 互斥机制 实现变量可以共享访问之外,RUST 还提供了叫做消息传递的通信机制
利用了标准库中的 std::sync::mpsc 提供了一个无锁定的多生产者,单订阅者队列,作为彼此通信的共享消息队列
- channel: 一种异步的无限缓冲通道
- sync_channel : 同步的有界 缓冲通道
use std::thread;
use std::sync::mpsc::channel;
fn main() {
let(tx,rx) = channel();
let child = thread::spawn( move || {
while let Ok(n) = rx.recv() {
println!("recived: {}",n);
if n == 9 {
break;
}
}
});
for i in 0..10 {
tx.send(i).unwrap();
}
child.join().expect("Failed join child thread");
println!("main exit");
}
关于异步和同步:
- 异步通道,send不会阻塞(如果消费者速度小于生产者,内存可能耗尽);同步通道,大小有限,可以阻塞
关于多生产者和单一消费者:
- tx 支持Copy语义,可以被复制多份,也就可以被多个线程共同持有
- rx 不支持Copy,也就是所有权唯一,只能一个线程接收消息