From 3f109c73d709c5d7641cb3cee792a09d114b1248 Mon Sep 17 00:00:00 2001 From: Till Hoeppner Date: Sun, 28 Jun 2015 18:51:49 +0200 Subject: Fix client --- src/client.rs | 78 ++++++++++++++++++++++++++++++++--------------------------- 1 file changed, 43 insertions(+), 35 deletions(-) (limited to 'src') diff --git a/src/client.rs b/src/client.rs index 80f68e2..d3a8542 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,10 +5,11 @@ use std::io::{ BufRead, BufReader, }; - use std::net::TcpStream; use std::borrow::Cow::{ self, Borrowed, Owned }; use std::sync::{ Arc, RwLock }; +use std::mem; +use std::cell::UnsafeCell; use carboxyl::{ Stream, Sink }; @@ -147,7 +148,7 @@ impl OwnedClient { self.send_message(cmd.to_message()) } - fn intern_listen(&mut self, events: Option) -> Result<()> + pub fn listen_with_callback(&mut self, on_event: F) -> Result<()> where F: Fn(&mut Client, &Message, Option) { let reader = BufReader::new(match self.stream { Some(StreamKind::Plain(ref s)) => StreamKind::Plain((*s).try_clone().unwrap()), @@ -162,44 +163,53 @@ impl OwnedClient { if let Ok(msg) = line { self.handle_event(&msg); - // If a callback is desired, try to parse the message - // into a Command or a Reply, and call back. - if let Some(ref on_event) = events { - let event = match Command::from_message(&msg) { - Some(m) => Some(Event::Command(m)), - None => match Reply::from_message(&msg) { - Some(r) => Some(Event::Reply(r)), - None => None - } - }; - - on_event(self, &msg, event); - } - - self.sink.send(msg) + // Try to parse the message into a Command or a Reply, and call back. + let event = match Command::from_message(&msg) { + Some(m) => Some(Event::Command(m)), + None => match Reply::from_message(&msg) { + Some(r) => Some(Event::Reply(r)), + None => None + } + }; + on_event(self, &msg, event); } } Result(Ok(())) } - pub fn listen(&mut self) -> Result<()> { - self.intern_listen::)>(None) - } + #[allow(mutable_transmutes)] + fn listen_with_events(&self) -> Result<()> { + let mut s: &mut OwnedClient = unsafe { mem::transmute(self) }; + let reader = BufReader::new(match self.stream { + Some(StreamKind::Plain(ref s)) => StreamKind::Plain((*s).try_clone().unwrap()), + #[cfg(feature = "ssl")] + Some(StreamKind::Ssl(ref s)) => StreamKind::Ssl((*s).try_clone().unwrap()), + None => return Result(Err(IrscError::NotConnected)) + }); - pub fn listen_with_callback(&mut self, events: F) -> Result<()> - where F: Fn(&mut Client, &Message, Option) { - self.intern_listen(Some(events)) + for line in reader.lines() { + let line = line.unwrap().parse(); + + if let Ok(msg) = line { + s.handle_event(&msg); + self.sink.send(msg); + } + } + Result(Ok(())) } pub fn into_shared(self) -> SharedClient { SharedClient { - client: Arc::new(RwLock::new(self)), + client: Arc::new(OwnedClientCell(UnsafeCell::new(self))), } } pub fn messages(&self) -> Stream { self.sink.stream() } } +struct OwnedClientCell(UnsafeCell); +unsafe impl Sync for OwnedClientCell {} + impl Client for OwnedClient { fn send_message(&mut self, msg: Message) -> Result<()> { self.send_raw(&msg.to_string()) @@ -208,28 +218,28 @@ impl Client for OwnedClient { #[derive(Clone)] pub struct SharedClient { - client: Arc>, + client: Arc, } impl SharedClient { pub fn messages(&self) -> Stream<(SharedClient, Message)> { let cl = SharedClient { client: self.client.clone() }; - self.client.read().unwrap().messages() - .map(move |m| { println!("Message!"); (cl.clone(), m) }) + unsafe { &*self.client.0.get() }.messages() + .map(move |m| (cl.clone(), m)) } - pub fn events(&self) -> Stream<(SharedClient, Message, Arc>)> { + pub fn events(&self) -> Stream<(SharedClient, Message, Event<'static>)> { self.messages().filter_map(|(cl, msg)| match Command::from_message(&msg) { - Some(m) => Some((cl, msg.clone(), Arc::new(Event::Command(m.clone()).to_static()))), + Some(m) => Some((cl, msg.clone(), Event::Command(m.clone()).to_static())), None => match Reply::from_message(&msg) { - Some(r) => Some((cl, msg.clone(), Arc::new(Event::Reply(r).to_static()))), + Some(r) => Some((cl, msg.clone(), Event::Reply(r).to_static())), None => None } }) } - pub fn listen(&self) -> Result<()> { - self.client.write().unwrap().listen() + pub fn listen_with_events(&mut self) -> Result<()> { + unsafe { &*self.client.0.get() }.listen_with_events() } pub fn commands(&self) -> Stream<(SharedClient, Message, Command<'static>)> { @@ -249,8 +259,6 @@ impl SharedClient { impl Client for SharedClient { fn send_message(&mut self, msg: Message) -> Result<()> { - if let Ok(mut guard) = self.client.write() { - guard.send_raw(&msg.to_string()) - } else { Result(Ok(())) } + unsafe { &mut *self.client.0.get() }.send_raw(&msg.to_string()) } } -- cgit v1.2.3