My goal is to reduce the following read_stream_***() functions into a generic fuction that can be passed different streams.
use async_std::net::TcpStream;
use async_std::{ task };
use async_std::io::{ stdin, BufReader, Stdin };
use async_std:: { prelude::* };
use futures::{select, FutureExt, AsyncRead };
pub async fn read_stream_stdin(streem:Stdin) -> Result<(), std::io::Error>
{
let mut lines_from_stream = BufReader::new(streem).lines().fuse();
loop {
select! {
line = lines_from_stream.next().fuse() => match line {
Some(line) => {
println!("{:?}",line?);
}
None => break,
}
}
}
Ok(())
}
pub async fn read_stream_tcp(streem:TcpStream) -> Result<(), std::io::Error>
{
let mut lines_from_stream = BufReader::new(streem).lines().fuse();
loop {
select! {
line = lines_from_stream.next().fuse() => match line {
Some(line) => {
println!("{:?}",line?);
}
None => break,
}
}
}
Ok(())
}
pub async fn connect_tcp_server(host_port:&str) -> Result<(), std::io::Error>
{
let streem = TcpStream::connect(host_port).await;
let _result = task::block_on(read_stream_tcp(streem?));
Ok(())
}
fn main() -> Result<(), std::io::Error> {
task::spawn( connect_tcp_server("127.0.0.1:8081") );
task::block_on(read_stream_stdin(stdin()))
}
The Generic Attempt:
pub async fn read_stream<T>(streem:T) -> Result<(), std::io::Error>
{
let mut lines_from_stream = BufReader::new(streem).lines().fuse();
loop {
select! {
line = lines_from_stream.next().fuse() => match line {
Some(line) => {
println!("{:?}",line?);
}
None => break,
}
}
}
Ok(())
}
The Cargo.toml
[package]
name = "gen_func"
version = "0.1.0"
edition = "2021"
[dependencies]
async-std = "1.9.0"
futures = "0.3.21"
I attempted <T: async_std::io::Read> but fuse() and lines() are not implemented. and AsyncRead is not found in async_std::io . I found AsyncRead in futures crate but again fuse() and lines() were not implemented. I am not set on the read pattern. I am new to Rust and trying to build my source library to solve future programming tasks.
First, as pointed out by kmdreko, the logic of your function(s) can be greatly simplified (at least based on the information given):
pub async fn read_stream_tcp(stream: TcpStream) -> Result<(), std::io::Error> {
let mut lines = BufReader::new(stream).lines();
while let Some(line) = lines.next().await {
println!("{line:?}");
}
}
Ok(())
Then, to figure out how to make this generic, you can just let the compiler tell you what it needs:
pub async fn read_stream<T>(stream: T) -> Result<(), std::io::Error>
{
let mut lines = BufReader::new(stream).lines();
while let Some(line) = lines.next().await {
println!("{line:?}");
}
Ok(())
}
Notice the lack of where
clauses or other constraints on T
. The compiler will now complain:
error[E0277]: the trait bound `T: async_std::io::Read` is not satisfied --> src/main.rs:15:36 |
15 | let mut lines = BufReader::new(stream).lines();
| -------------- ^^^^^^ the trait `async_std::io::Read` is not implemented for `T`
| |
| required by a bound introduced by this call
|
note: required by a bound in `async_std::io::BufReader::<R>::new`
--> /home/lucas/.cargo/registry/src/github.com-1ecc6299db9ec823/async-std-1.12.0/src/io/buf_reader.rs:55:9
|
55 | impl<R: io::Read> BufReader<R> {
| ^^^^^^^^ required by this bound in `async_std::io::BufReader::<R>::new`
help: consider restricting type parameter `T`
|
13 | pub async fn read_stream<T: async_std::io::Read>(stream: T) -> Result<(), std::io::Error>
| +++++++++++++++++++++
Applying the compiler's suggestions (the above will result in a follow-up error) yields a full where
clause of T: async_std::io::Read + std::marker::Unpin
:
pub async fn read_stream<T>(stream: T) -> Result<(), std::io::Error>
where
T: Read + std::marker::Unpin,
{
let mut lines = BufReader::new(stream).lines();
while let Some(line) = lines.next().await {
println!("{line:?}");
}
Ok(())
}
async fn try_it() {
// These will now compile just fine
read_stream(async_std::io::stdin()).await.unwrap();
read_stream(TcpStream::connect("127.0.0.1:8080").await.unwrap()).await.unwrap();
}
I attempted <T: async_std::io::Read> but fuse() and lines() are not implemented
This suggests that you tried replacing BufReader::new(stream)
at the same time. You can do that, but you need to tell the compiler that you need something that implements the lines()
method. Either make the parameter a fixed type BufReader<T>
or make the where
clause T: async_std::io::BufRead + std::marker::Unpin
for a generic type.