1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
use std::{pin::Pin, sync::Arc, collections::VecDeque, mem, task::Poll};
use futures_util::{Future, Stream, task, FutureExt};
use tokio::{net::{tcp::{OwnedReadHalf, OwnedWriteHalf}, TcpStream, ToSocketAddrs}, sync::Mutex, io::AsyncReadExt};
use crate::{network::{Connection, Error, Message, Event}, packet::Packet, parser::Parser, BYTE_BUFFER_SIZE};
pub struct Client {
nickname: Vec<u8>,
stream_read: Arc<Mutex<OwnedReadHalf>>,
stream_write: OwnedWriteHalf,
packet_queue: VecDeque<Packet>,
parser: Arc<Mutex<Parser>>,
get_packet_future: Pin<Box<dyn Future<Output = Result<Packet, Error>>>>,
}
impl Connection for Client {
fn connect<A, B>(addr: A, nickname: B) -> Pin<Box<dyn Future<Output = Result<Self, Error>> + 'static>>
where A: ToSocketAddrs + 'static, B: Into<Vec<u8>> + 'static, Self: Sized
{
Box::pin(async move {
let (stream_read, mut stream_write) = TcpStream::connect(addr).await?.into_split();
let stream_read = Arc::new(Mutex::new(stream_read));
let mut packet_queue = VecDeque::new();
let parser = Arc::new(Mutex::new(Parser::new()));
let mut get_packet_future = Box::pin({
let parser = parser.clone();
let stream_read = stream_read.clone();
get_packet(parser, stream_read)
});
Packet::SetNicknameRequest(nickname.into()).async_write(&mut stream_write).await?;
let nickname = loop {
let packet = mem::replace(&mut get_packet_future, Box::pin(get_packet(parser.clone(), stream_read.clone()))).await?;
match packet {
Packet::SetNickname(nickname) => break nickname,
packet => packet_queue.push_back(packet),
}
};
Ok(Self {
nickname,
stream_read,
stream_write,
packet_queue,
parser,
get_packet_future,
})
})
}
fn send<B>(&mut self, message: Message, recipients: Vec<B>) -> Pin<Box<dyn Future<Output = Result<(), Error>> + '_>>
where B: Into<Vec<u8>>, B: 'static
{
Box::pin(async move {
let nickname = self.nickname.clone();
let recipients = recipients.into_iter().map(|r| r.into()).collect();
match message {
Message::Word(word) => Packet::SendWord { sender: nickname, recipients, word },
Message::Instructions(instructions) => Packet::SendInstructions { sender: nickname, recipients, instructions },
Message::List(list) => Packet::SendList { sender: nickname, recipients, list },
Message::Image(bytes) => Packet::SendImage { sender: nickname, recipients, bytes },
Message::Object(bytes) => Packet::SendObject { sender: nickname, recipients, bytes },
}.async_write(&mut self.stream_write).await?;
Ok(())
})
}
fn user_list(&mut self) -> Pin<Box<dyn Future<Output = Result<Vec<Vec<u8>>, Error>> + '_>> {
Box::pin(async move {
Packet::UserListRequest.async_write(&mut self.stream_write).await?;
Ok(loop {
let packet = mem::replace(&mut self.get_packet_future, Box::pin(get_packet(self.parser.clone(), self.stream_read.clone()))).await?;
match packet {
Packet::UserListResponse(user_list) => break user_list,
packet => self.packet_queue.push_back(packet),
}
})
})
}
fn change_nickname<B>(&mut self, nickname: B) -> Pin<Box<dyn Future<Output = Result<(), Error>> + '_>>
where B: Into<Vec<u8>>, B: 'static
{
Box::pin(async move {
Packet::ChangeNicknameRequest(nickname.into()).async_write(&mut self.stream_write).await?;
Ok(())
})
}
}
impl Stream for Client {
type Item = Result<Event, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(packet) = self.get_packet_future.poll_unpin(cx)? {
self.packet_queue.push_back(packet);
self.get_packet_future = Box::pin(get_packet(self.parser.clone(), self.stream_read.clone()));
}
match self.packet_queue.pop_front() {
Some(Packet::DeliverWord { sender, word }) => Poll::Ready(Some(Ok(Event::RecievedMessage { sender, message: Message::Word(word) }))),
Some(Packet::DeliverInstructions { sender, instructions }) => Poll::Ready(Some(Ok(Event::RecievedMessage { sender, message: Message::Instructions(instructions) }))),
Some(Packet::DeliverList { sender, list }) => Poll::Ready(Some(Ok(Event::RecievedMessage { sender, message: Message::List(list) }))),
Some(Packet::DeliverImage { sender, bytes }) => Poll::Ready(Some(Ok(Event::RecievedMessage { sender, message: Message::Image(bytes) }))),
Some(Packet::DeliverObject { sender, bytes }) => Poll::Ready(Some(Ok(Event::RecievedMessage { sender, message: Message::Object(bytes) }))),
Some(Packet::SetNickname(nickname)) => {
self.nickname = nickname.clone();
Poll::Ready(Some(Ok(Event::SetNickname(nickname))))
},
Some(Packet::ChangeNickname(nickname)) => {
self.nickname = nickname.clone();
Poll::Ready(Some(Ok(Event::ChangedNickname(nickname))))
},
Some(_) | None => {
cx.waker().wake_by_ref();
Poll::Pending
},
}
}
}
async fn get_packet(parser: Arc<Mutex<Parser>>, stream: Arc<Mutex<OwnedReadHalf>>) -> Result<Packet, Error> {
let mut parser = parser.lock().await;
let mut stream = stream.lock().await;
Ok(loop {
if let Some(packet) = parser.get_packet() {
break packet;
}
let mut bytes = [0; BYTE_BUFFER_SIZE];
let byte_count = stream.read(&mut bytes).await?;
parser.parse(&bytes[0..byte_count])?;
})
}