当前位置: 首页 > news >正文

网站建设情况2019年度最火关键词

网站建设情况,2019年度最火关键词,深圳人才热线招聘官网,如何做vip视频网站Rust 学习笔记:Stream Rust 学习笔记:Stream流组合流合并流 Rust 学习笔记:Stream 许多概念天然适合用 Stream 表示: 队列中逐渐可用的项目文件系统中逐渐拉取的数据块网络中随时间到达的数据 流 消息传递中异步的 recv 方法会…

Rust 学习笔记:Stream

  • Rust 学习笔记:Stream
    • 组合流
    • 合并流

Rust 学习笔记:Stream

许多概念天然适合用 Stream 表示:

  • 队列中逐渐可用的项目
  • 文件系统中逐渐拉取的数据块
  • 网络中随时间到达的数据

消息传递中异步的 recv 方法会随时间产生一系列项目,称为流(Stream)。

迭代器和异步通道接收器之间有两个不同之处。第一个区别是时间:迭代器是同步的,而通道接收器是异步的。第二个是 API。当直接使用 Iterator 时,调用它的同步 next 方法,而对于 trpl::Receiver 流,我们调用异步 recv 方法。

Streams 类似于一种异步形式的迭代器。迭代器和 Rust 中的流之间的相似性意味着我们实际上可以从任何迭代器创建流。与使用迭代器一样,我们可以调用流的 next 方法,然后等待输出。

Stream trait 定义了一个底层接口,它有效地结合了 Iterator 和 Future trait。StreamExt 在 Stream 之上提供了一组更高级的 API,包括 next 方法以及其他类似于 Iterator trait 所提供的实用方法。Stream 和 StreamExt 还不是 Rust 标准库的一部分,但大多数生态系统的 crate 使用相同的定义。

Ext 是扩展的缩写,是 Rust 社区中用于将一个特性扩展到另一个特性的常用模式。

use trpl::StreamExt;fn main() {trpl::run(async {let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];let iter = values.iter().map(|n| n * 2);let mut stream = trpl::stream_from_iter(iter);while let Some(value) = stream.next().await {println!("The value was: {value}");}});
}

程序从一个数字数组开始,将其转换为迭代器,然后调用 map 函数对所有值进行翻倍操作。使用 trpl::stream_from_iter 函数将迭代器转换为流。接下来,当流中的项到达时,使用 while let 循环遍历它们。

程序输出:

The value was: 2
The value was: 4
The value was: 6
The value was: 8
The value was: 10
The value was: 12
The value was: 14
The value was: 16
The value was: 18
The value was: 20

既然 StreamExt 已经在作用域中,我们可以使用它的所有实用方法,就像使用迭代器一样。

例如,我们使用 filter 方法过滤除 3 和 5 的倍数以外的所有内容。

use trpl::StreamExt;fn main() {trpl::run(async {let values = 1..101;let iter = values.map(|n| n * 2);let stream = trpl::stream_from_iter(iter);let mut filtered =stream.filter(|value| value % 3 == 0 || value % 5 == 0);while let Some(value) = filtered.next().await {println!("The value was: {value}");}});
}

组合流

因为流是 future,我们可以将它们与任何其他类型的 future 一起使用,并以有趣的方式组合它们。

让我们首先构建一个小消息流,作为我们网络数据流的替代:

use trpl::{ReceiverStream, Stream, StreamExt};fn get_messages() -> impl Stream<Item = String> {let (tx, rx) = trpl::channel();let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];for message in messages {tx.send(format!("Message: '{message}'")).unwrap();}ReceiverStream::new(rx)
}fn main() {trpl::run(async {let mut messages = get_messages();while let Some(message) = messages.next().await {println!("{message}");}});
}

首先,我们创建一个名为 get_messages 的函数,它返回 impl Stream<Item = String>。为了实现它,我们创建了一个异步通道,并通过通道发送一系列字符串。我们还使用了一个新的类型:ReceiverStream,它通过 next 方法将 rx 接收器从 trpl::channel 转换为流。

回到 main 中,我们使用 while let 循环打印流中的所有消息。当我们运行这段代码时,我们得到了我们所期望的结果:

Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'

这些用常规的 Receiver 或 Iterator API 都能完成。让我们添加一个需要流的特性:添加一个适用于流中的每个项的超时,以及我们发出的项的延迟。

use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};fn get_messages() -> impl Stream<Item = String> {let (tx, rx) = trpl::channel();trpl::spawn_task(async move {let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];for (index, message) in messages.into_iter().enumerate() {let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };trpl::sleep(Duration::from_millis(time_to_sleep)).await;tx.send(format!("Message: '{message}'")).unwrap();}});ReceiverStream::new(rx)
}fn main() {trpl::run(async {let mut messages =pin!(get_messages().timeout(Duration::from_millis(200)));while let Some(result) = messages.next().await {match result {Ok(message) => println!("{message}"),Err(reason) => eprintln!("Problem: {reason:?}"),}}})
}

我们首先使用 timeout 方法向流添加一个超时,该方法来自 StreamExt trait。然后更新 while let 循环体,因为流现在返回一个 Result。Ok 变体表示消息及时到达;Err 变体表示在任何消息到达之前超时已经过了。我们匹配该结果,并在成功接收消息时打印消息,或者打印关于超时的通知。

接着为发送的消息添加一个可变延迟。在 get_messages 函数中,我们对偶数索引项应用 100 ms 的延迟,对奇索引项应用 300 ms 的延迟,因为我们的超时是 200 ms,所以这应该会影响一半的消息。

要在 get_messages 函数中的消息之间休眠而不阻塞,我们需要使用 async。然而,我们不能使 get_messages 本身成为一个异步函数,因为那样我们将返回 Future<Output = Stream<Item = String>> 而不是 Stream<Item = String>>。因此,我们将 get_messages 保留为返回流的常规函数,并生成一个任务来处理异步 sleep 调用。

请记住:在给定的 future 内,一切都是线性发生的。并发发生在 future 之间。

调用者必须等待 get_messages 本身来访问流,这将要求它在返回接收方流之前发送所有消息,包括每个消息之间的睡眠延迟。因此,我们设置的超时将是无用的。Stream 本身不会有延迟,它们都可能在流可用之前发生。

程序输出:

Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'

在接收偶数索引项消息前,都出现 Problem: Elapsed(()) 错误。

发送偶数索引项消息后需要休眠 300 ms,main 函数中轮询流的间隔是 200 ms。

第一次轮询的 result 匹配到 Err(reason),于是打印错误。

超时并不会阻止消息最终到达。我们仍然获得所有原始消息,因为我们的通道是无界的:它可以容纳内存中可以容纳的尽可能多的消息。

第二次轮询时,消息已经到达,result 匹配到 Ok(message),于是打印接收到的消息。

合并流

首先,让我们创建另一个流。在无限循环中,每隔 1 ms 发送一个数字。对于 async,只要在循环的每次迭代中至少有一个等待点,就不会阻塞其他任何东西。

fn get_intervals() -> impl Stream<Item = u32> {let (tx, rx) = trpl::channel();trpl::spawn_task(async move {let mut count = 0;loop {trpl::sleep(Duration::from_millis(1)).await;count += 1;tx.send(count).unwrap();}});ReceiverStream::new(rx)
}

返回类型将是 impl Stream<Item = u32>。因为所有这些都被封装在由 spawn_task创 建的任务中,所以所有这些(包括无限循环)都将随着运行时一起被清理。

回到 main 函数的 async 块,我们可以尝试合并消息和间隔流:

fn main() {trpl::run(async {let messages = get_messages().timeout(Duration::from_millis(200));let intervals = get_intervals();let merged = messages.merge(intervals);})
}

merge 方法将多个流合并为一个流,该流在项可用时立即从任何流生成项,而不强加任何特定的顺序。

但是,这个 merge 调用不能编译!这是因为这两个流具有不同的类型。messages 流的类型为 Timeout<impl Stream<Item = String>>,其中 Timeout 是为超时调用实现 Stream 的类型。interval 流的类型为 impl Stream<Item = u32>。要合并这两个流,我们需要转换其中一个以匹配另一个。

修改 interval 流:

fn main() {trpl::run(async {let messages = get_messages().timeout(Duration::from_millis(200));let intervals = get_intervals().map(|count| format!("Interval: {count}")).timeout(Duration::from_secs(10));let merged = messages.merge(intervals);let mut stream = pin!(merged);while let Some(result) = stream.next().await {match result {Ok(message) => println!("{message}"),Err(reason) => eprintln!("Problem: {reason:?}"),}}})
}

首先,我们使用 map 方法将流中的数字转换为字符串。

其次,我们需要匹配 messages 流的类型 Timeout<…>,这里我们用 timeout 创建了一个超时。

最后,我们需要使流可变,以便 while let 循环的 next 调用可以遍历流,并将其 pin,以便这样做是安全的。

运行程序,将会出现两个问题。首先,它永远不会停止!其次,来自英文字母的消息将被隐藏在所有间隔计数器消息的中间。

...
Interval: 329
Interval: 330
Interval: 331
Interval: 332
Interval: 333
Interval: 334
Interval: 335
Interval: 336
Interval: 337
Interval: 338
Message: 'd'
Interval: 339
Interval: 340
Interval: 341
Interval: 342
Interval: 343
Interval: 344
Interval: 345
Interval: 346
...

修改程序,解决这两个问题:

fn main() {trpl::run(async {let messages = get_messages().timeout(Duration::from_millis(200));let intervals = get_intervals().map(|count| format!("Interval: {count}")).throttle(Duration::from_millis(100)).timeout(Duration::from_secs(10));let merged = messages.merge(intervals).take(20);let mut stream = pin!(merged);while let Some(result) = stream.next().await {match result {Ok(message) => println!("{message}"),Err(reason) => eprintln!("Problem: {reason:?}"),}}})
}

首先,我们在 intervals 流上使用 throttle 方法。节流是一种限制函数调用速率的方法。在这种情况下,限制流轮询的频率为每 100 ms 一次,这样 intervals 流就不会压倒 messages 流。

take 方法用于限制从流中接受的项的数量。我们将 take 方法应用于合并的流,因为我们想限制最终的输出,而不仅仅是一个流或另一个流。

现在,当我们运行程序时,它在从合并流中取出 20 个项后就停止。

Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12

尽管我们有一个源流可以每毫秒产生一个事件,但是节流调用产生了一个新的流,它包装了原始流,这样原始流就只能以节流速率轮询,而不是它自己的“本地”速率。我们并非忽略 intervals 流发送的消息,而是从一开始就不会产生这些消息!这是 Rust future 在工作中固有的“懒惰”,允许我们选择我们的性能特征。

我们还需要处理最后一件事:错误!对于这两种基于通道的流,当通道的另一端关闭时,send 调用可能会失败。到目前为止,我们通过调用 unwrap 忽略了这种可能性,但在一个行为良好的应用程序中,我们应该显式处理错误,至少通过结束循环,这样我们就不会再发送任何消息。

修改两个函数:

fn get_messages() -> impl Stream<Item = String> {let (tx, rx) = trpl::channel();trpl::spawn_task(async move {let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];for (index, message) in messages.into_iter().enumerate() {let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };trpl::sleep(Duration::from_millis(time_to_sleep)).await;if let Err(send_error) = tx.send(format!("Message: '{message}'")) {eprintln!("Cannot send message '{message}': {send_error}");break;}}});ReceiverStream::new(rx)
}fn get_intervals() -> impl Stream<Item = u32> {let (tx, rx) = trpl::channel();trpl::spawn_task(async move {let mut count = 0;loop {trpl::sleep(Duration::from_millis(1)).await;count += 1;if let Err(send_error) = tx.send(count) {eprintln!("Could not send interval {count}: {send_error}");break;};}});ReceiverStream::new(rx)
}

两个函数都使用了一个简单的错误策略:打印问题,然后跳出循环。

http://www.fp688.cn/news/160584.html

相关文章:

  • 专门做汽车配件的网站贵阳搜索引擎排名推广
  • 响应式网站开发原理seo排名官网
  • 有哪些网站做国外生意的长沙官网seo技巧
  • 广元如何做百度的网站抖音企业推广
  • 有关做内购的网站电商网站设计模板
  • 经营网站备案宁波如何做抖音seo搜索优化
  • seo优化网站模板百色seo快速排名
  • 网站建设及上线流程whois查询 站长工具
  • 手机网站设计要求百度推广手机版
  • 网站需要做实名认证如何做苏州seo
  • 做门户网站啥意思网站后台管理系统
  • 网站开发环境怎么写外链发布论坛
  • 哪个基层司法所网站做的比较好刚刚中国出啥大事了
  • 可以做彩票网站的工作室北京网站开发
  • 建设购物网站网络推广专员所需知识
  • 做破解的网站中文搜索引擎大全
  • 保定聊城网站建设网络销售怎么学
  • 建设网站实施条件全国疫情高峰时间表最新
  • 专门做网站建设的维普网论文收录查询
  • 网站内容管理平台百度账号批发网
  • 微云怎么做网站营销技巧五步推销法
  • 网站开发步骤需求分析超级外链推广
  • wordpress建站知乎西安网站建设公司十强
  • 男女生做羞羞网站情感式软文广告
  • 泰州企业自助建站系统友情链接查询工具
  • 做网站的软件去哪里买优化营商环境心得体会个人
  • 红河做网站的公司软文推广是什么
  • 做擦边球的网站营销广告文案
  • 网站关键词优化实验结果分析免费外链工具
  • 要服务网站建设推广网页