diff --git a/Cargo.toml b/Cargo.toml index 5b35b0c..a613e09 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,3 +9,8 @@ handy_async = "0.2" rust-crypto = "0.2" num = "0.1" splay_tree = "0.2" + +[dev-dependencies] +clap = "2" +fibers = "0.1" +futures = "0.1" diff --git a/examples/srtpsrv.rs b/examples/srtpsrv.rs new file mode 100644 index 0000000..ade38da --- /dev/null +++ b/examples/srtpsrv.rs @@ -0,0 +1,79 @@ +extern crate clap; +extern crate fibers; +extern crate futures; +#[macro_use] +extern crate trackable; +extern crate rtp; + +use clap::{App, Arg}; +use fibers::{Spawn, Executor, InPlaceExecutor}; +use fibers::net::UdpSocket; +use fibers::net::futures::RecvFrom; +use futures::{Future, Poll, Async}; +use trackable::error::ErrorKindExt; +use rtp::{Error, ErrorKind}; +use rtp::traits::ReadPacket; +use rtp::rfc3550::RtpPacketReader; +use rtp::rfc3711::{SrtpPacketReader, SrtpContext}; + +fn main() { + let matches = App::new("srtpsrv") + .arg(Arg::with_name("PORT").short("p").takes_value(true).default_value("6000")) + .arg(Arg::with_name("MASTER_KEY").short("k").takes_value(true) + .default_value("d34d74f37d74e75f3bdb4f76f1bdf477")) + .arg(Arg::with_name("MASTER_SALT").short("s").takes_value(true) + .default_value("7f1fe35d78f77e75e79f7beb5f7a")) + .get_matches(); + let port = matches.value_of("PORT").unwrap(); + let addr = format!("0.0.0.0:{}", port).parse().unwrap(); + + let master_key = hex_str_to_bytes(matches.value_of("MASTER_KEY").unwrap()); + let master_salt = hex_str_to_bytes(matches.value_of("MASTER_SALT").unwrap()); + let context = SrtpContext::new(&master_key, &master_salt); + let future = track_err!(UdpSocket::bind(addr)).and_then(move |socket| { + SrtpRecvLoop::new(socket, context) + }); + + let mut executor = InPlaceExecutor::new().unwrap(); + let monitor = executor.spawn_monitor(future); + let result = executor.run_fiber(monitor).unwrap() + .map_err(|e| e.unwrap_or_else(|| ErrorKind::Other.cause("disconnected"))); + track_try_unwrap!(result); +} + +fn hex_str_to_bytes(s: &str) -> Vec { + use std::u8; + let mut bytes = Vec::new(); + for i in 0..s.len() / 2 { + let b = u8::from_str_radix(&s[i * 2..i * 2 + 2], 16).unwrap(); + bytes.push(b); + } + bytes +} + +struct SrtpRecvLoop { + future: RecvFrom>, + reader: SrtpPacketReader, +} +impl SrtpRecvLoop { + fn new(socket: UdpSocket, context: SrtpContext) -> Self { + let inner = RtpPacketReader; + SrtpRecvLoop { + future: socket.recv_from(vec![0; 4096]), + reader: SrtpPacketReader::new(context, inner), + } + } +} +impl Future for SrtpRecvLoop { + type Item = (); + type Error = Error; + fn poll(&mut self) -> Poll { + while let Async::Ready(v) = track_try!(self.future.poll().map_err(|e| e.2)) { + let (socket, buf, size, peer) = v; + let packet = track_try!(self.reader.read_packet(&mut &buf[..size])); + println!("Recv packet from {}: {:?}", peer, packet); + self.future = socket.recv_from(buf); + } + Ok(Async::NotReady) + } +} diff --git a/src/rfc5761.rs b/src/rfc5761.rs index 247bce1..d08a90d 100644 --- a/src/rfc5761.rs +++ b/src/rfc5761.rs @@ -1,6 +1,6 @@ use std::io::{Read, Write}; -use {Result}; +use Result; use traits::{ReadPacket, WritePacket, RtpPacket, RtcpPacket, Packet}; #[derive(Debug, Clone, PartialEq, Eq)] @@ -74,7 +74,7 @@ impl WritePacket for MuxPacketWriter fn write_packet(&mut self, writer: &mut W, packet: &Self::Packet) -> Result<()> { match *packet { MuxedPacket::Rtp(ref p) => self.rtp_writer.write_packet(writer, p), - MuxedPacket::Rtcp(ref p) => self.rtcp_writer.write_packet(writer, p), + MuxedPacket::Rtcp(ref p) => self.rtcp_writer.write_packet(writer, p), } } }