Compare commits

...

10 Commits

Author SHA1 Message Date
anon 1876a53a94 add stuff 2023-10-26 18:08:16 +03:00
Jonas Herzig 6c0223d8a4
Merge pull request #2 from Skgland/replace-rust-crypto2
replace unmaintained rust-crypto crate
2021-01-17 16:37:11 +01:00
Skgland 3f56dd92c8
fix sorting of non-optional dependencies 2020-11-30 11:19:52 +01:00
Skgland 2fe70b3900
replace unmaintained rust-crypto crate
using the `RustCrypto` crates instead of the `rust-crypto` crate as
mentioned in the `RustCrypto GitHub Org` Section of [this
Advisory](https://rustsec.org/advisories/RUSTSEC-2016-0005)
2020-11-09 13:27:00 +01:00
Jonas Herzig 1444b3c063 Update rfc5764 to use futures 0.3 AsyncRead/Write
With futures 0.3, the AsyncRead/Write traits are no longer marker traits, so we
can no longer easily support both, non-blocking and blocking APIs.
The code can however still be used in a blocking way buy just calling
`now_or_never` on any futures it returns, which will block if the underlying
streams are blocking.
2020-04-05 14:25:14 +02:00
Jonas Herzig 3510417ce4 Rust 2018 edition 2020-04-04 01:08:43 +02:00
Jonas Herzig ee8be93855 Fix panic in SRTP crypto when the most significant byte of IV is 0 2018-12-10 21:08:51 +01:00
Jonas Herzig a2cfaf2d3e Fix writing of SdesChunk (length is 8, not 16 bits) 2018-12-10 20:38:33 +01:00
Jonas Herzig 0a8ec94f7d Add DTLS-SRTP support (rfc5764)
https://tools.ietf.org/html/rfc5764
2018-12-02 18:47:24 +01:00
Jonas Herzig 6e21908845 Support adding not-yet-known SSRCs to an SRTP session 2018-12-02 13:53:47 +01:00
13 changed files with 958 additions and 84 deletions

View File

@ -2,15 +2,28 @@
name = "rtp"
version = "0.1.0"
authors = ["Takeru Ohta <phjgt308@gmail.com>"]
edition = "2018"
[features]
default = []
rfc5764-openssl = ["openssl", "tokio-openssl", "tokio-util/compat"]
[dependencies]
trackable = "0.1"
handy_async = "0.2"
rust-crypto = "0.2"
num = "0.1"
aes-ctr = "0.6.0"
async-trait = "0.1"
fixedbitset = "0.1"
futures = "0.3"
handy_async = "0.2"
hmac = "0.10.1"
num = "0.1"
sha-1 = "0.9.8"
trackable = "0.1"
openssl = { version = "0.10", optional = true }
tokio-openssl = { version = "0.4", optional = true }
tokio-util = { version = "0.3", optional = true, default-features = false }
[dev-dependencies]
clap = "2"
fibers = "0.1"
futures = "0.1"
futures01 = { package = "futures", version = "0.1" }

View File

@ -12,3 +12,4 @@ RFC
- AVPF: https://tools.ietf.org/html/rfc4585
- SAVPF: https://tools.ietf.org/html/rfc5124
- Multiplexing RTP and RTCP: https://tools.ietf.org/html/rfc5761
- DTLS-SRTP: https://tools.ietf.org/html/rfc5764

View File

@ -1,6 +1,6 @@
extern crate clap;
extern crate fibers;
extern crate futures;
extern crate futures01 as futures;
#[macro_use]
extern crate trackable;
extern crate rtp;

View File

@ -1,6 +1,7 @@
use std::io::{Read, Write};
use trackable::*;
use Result;
use crate::Result;
pub trait ReadFrom: Sized {
fn read_from<R: Read>(reader: &mut R) -> Result<Self>;

View File

@ -1,17 +1,16 @@
#[macro_use]
extern crate trackable;
extern crate crypto;
extern crate handy_async;
extern crate num;
extern crate fixedbitset;
#[cfg(feature = "openssl")]
extern crate openssl;
#[cfg(feature = "tokio")]
extern crate tokio;
pub use error::{Error, ErrorKind};
pub use crate::error::{Error, ErrorKind};
pub mod io;
pub mod rfc3550;
pub mod rfc3711;
pub mod rfc4585;
pub mod rfc5761;
pub mod rfc5764;
pub mod traits;
mod error;

View File

@ -1,11 +1,12 @@
use handy_async::sync_io::{ReadExt, WriteExt};
use std::io::{Read, Write};
use trackable::*;
use constants::RTP_VERSION;
use io::{ReadFrom, WriteTo};
use traits::{self, Packet};
use types::{NtpMiddleTimetamp, NtpTimestamp, RtpTimestamp, Ssrc, SsrcOrCsrc, U24, U5};
use {ErrorKind, Result};
use crate::constants::RTP_VERSION;
use crate::io::{ReadFrom, WriteTo};
use crate::traits::{self, Packet};
use crate::types::{NtpMiddleTimetamp, NtpTimestamp, RtpTimestamp, Ssrc, SsrcOrCsrc, U24, U5};
use crate::{ErrorKind, Result};
pub const RTCP_PACKET_TYPE_SR: u8 = 200;
pub const RTCP_PACKET_TYPE_RR: u8 = 201;
@ -510,10 +511,10 @@ impl WriteTo for SdesChunk {
write_bytes += 1;
let text = item.text();
track_assert!(text.len() <= 0xFFFF, ErrorKind::Invalid);
track_try!(writer.write_u16be(text.len() as u16));
track_assert!(text.len() <= 0xFF, ErrorKind::Invalid);
track_try!(writer.write_u8(text.len() as u8));
track_try!(writer.write_all(text.as_bytes()));
write_bytes += 2 + text.len();
write_bytes += 1 + text.len();
}
track_try!(writer.write_u8(SDES_ITEM_TYPE_END));
write_bytes += 1;

View File

@ -1,11 +1,12 @@
use handy_async::sync_io::{ReadExt, WriteExt};
use std::io::{Read, Write};
use trackable::*;
use constants::RTP_VERSION;
use io::{ReadFrom, WriteTo};
use traits::{self, Packet};
use types::{Csrc, RtpTimestamp, Ssrc, U7};
use {ErrorKind, Result};
use crate::constants::RTP_VERSION;
use crate::io::{ReadFrom, WriteTo};
use crate::traits::{self, Packet};
use crate::types::{Csrc, RtpTimestamp, Ssrc, U7};
use crate::{ErrorKind, Result};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RtpPacketReader;

View File

@ -1,16 +1,19 @@
use crypto;
// FIXME: saveguard against two-time pad by running replay-protection on outgoing packets
use fixedbitset::FixedBitSet;
use handy_async::sync_io::{ReadExt, WriteExt};
use num::BigUint;
use num::{BigUint, Integer};
use std::borrow::Cow;
use std::collections::BTreeMap;
use std::io::{Read, Write};
use trackable::*;
use io::{ReadFrom, WriteTo};
use rfc3550;
use traits::{ReadPacket, RtcpPacket, RtpPacket, WritePacket};
use types::{Ssrc, U48};
use {ErrorKind, Result};
use crate::io::{ReadFrom, WriteTo};
use crate::rfc3550;
use crate::traits::{ReadPacket, RtcpPacket, RtpPacket, WritePacket};
use crate::types::{Ssrc, U48};
use crate::{ErrorKind, Result};
use aes_ctr::cipher::{NewStreamCipher, SyncStreamCipher};
use hmac::{Mac, NewMac};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EncryptionAlgorithm {
@ -85,6 +88,7 @@ pub struct Context<P: Protocol> {
pub encryption: EncryptionAlgorithm,
pub authentication: AuthenticationAlgorithm,
pub auth_tag_len: usize,
pub unknown_ssrcs: usize,
pub ssrc_context: BTreeMap<u32, SsrcContext<P>>,
}
@ -357,6 +361,7 @@ impl Protocol for Srtcp {
impl<P: Protocol> Context<P>
where
u64: From<P::PacketIndex>,
P::PacketIndex: Into<BigUint>,
{
pub fn new(master_key: &[u8], master_salt: &[u8]) -> Self {
Context {
@ -366,6 +371,7 @@ where
encryption: EncryptionAlgorithm::default(),
authentication: AuthenticationAlgorithm::default(),
auth_tag_len: 80 / 8,
unknown_ssrcs: 0,
ssrc_context: BTreeMap::new(),
}
}
@ -386,6 +392,10 @@ where
);
}
pub fn add_unknown_ssrcs(&mut self, count: usize) {
self.unknown_ssrcs += count;
}
pub fn update_session_keys(&mut self, ssrc: Ssrc, index: P::PacketIndex) {
let index = if self.key_derivation_rate == 0 {
0
@ -464,19 +474,18 @@ where
let iv = BigUint::from_bytes_be(&context.session_salt_key) << 16;
let iv = iv ^ (BigUint::from(ssrc) << 64);
let iv = iv ^ (index.into() << 16);
let iv = &iv.to_bytes_be()[0..context.session_encr_key.len()];
let iv = iv ^ (BigUint::from(1_u8) << (context.session_encr_key.len() * 8));
let iv = &iv.to_bytes_be()[1..context.session_encr_key.len() + 1];
let mut ctr =
aes_ctr::Aes128Ctr::new_var(&context.session_encr_key, iv).expect("Correct Key Length");
let mut ctr = crypto::aes::ctr(
crypto::aes::KeySize::KeySize128,
&context.session_encr_key,
iv,
);
let block_size = context.session_encr_key.len();
for block in encrypted.chunks(block_size) {
let old_len = decrypted.len();
decrypted.resize(old_len + block.len(), 0);
ctr.process(block, &mut decrypted[old_len..]);
decrypted.extend_from_slice(block);
ctr.apply_keystream(&mut decrypted[old_len..]);
}
}
pub fn decrypt(
@ -499,19 +508,17 @@ where
let iv = BigUint::from_bytes_be(&context.session_salt_key) << 16;
let iv = iv ^ (BigUint::from(ssrc) << 64);
let iv = iv ^ (index.into() << 16);
let iv = &iv.to_bytes_be()[0..context.session_encr_key.len()];
let iv = iv ^ (BigUint::from(1_u8) << (context.session_encr_key.len() * 8));
let iv = &iv.to_bytes_be()[1..context.session_encr_key.len() + 1];
let mut ctr = crypto::aes::ctr(
crypto::aes::KeySize::KeySize128,
&context.session_encr_key,
iv,
);
let mut ctr =
aes_ctr::Aes128Ctr::new_var(&context.session_encr_key, iv).expect("Correct Key Length");
let block_size = context.session_encr_key.len();
for block in plaintext.chunks(block_size) {
let old_len = encrypted.len();
encrypted.resize(old_len + block.len(), 0);
ctr.process(block, &mut encrypted[old_len..]);
encrypted.extend_from_slice(block);
ctr.apply_keystream(&mut encrypted[old_len..]);
}
}
pub fn encrypt(
@ -528,7 +535,24 @@ where
pub fn process_incoming(&mut self, packet: &[u8]) -> Result<Vec<u8>> {
// Step 1: determining the correct context
let ssrc = track_try!(P::read_ssrc(packet));
track_assert!(self.ssrc_context.contains_key(&ssrc), ErrorKind::Invalid, "Unknown SSRC {}", ssrc);
if !self.ssrc_context.contains_key(&ssrc) {
track_assert!(
self.unknown_ssrcs > 0,
ErrorKind::Invalid,
"Unknown SSRC {}",
ssrc
);
self.unknown_ssrcs -= 1;
let ssrc_context = SsrcContext {
replay_window_head: 0,
replay_window: FixedBitSet::with_capacity(128),
session_encr_key: vec![0; 128 / 8],
session_salt_key: vec![0; 112 / 8],
session_auth_key: vec![0; 160 / 8],
protocol_specific: P::default(),
};
self.ssrc_context.insert(ssrc, ssrc_context);
}
// Step 2: Determine index of the packet
let index = track_try!(P::determine_incoming_packet_index(
@ -600,7 +624,24 @@ where
pub fn process_outgoing(&mut self, packet: &[u8]) -> Result<Vec<u8>> {
// Step 1: determining the correct context
let ssrc = track_try!(P::read_ssrc(packet));
track_assert!(self.ssrc_context.contains_key(&ssrc), ErrorKind::Invalid, "Unknown SSRC {}", ssrc);
if !self.ssrc_context.contains_key(&ssrc) {
track_assert!(
self.unknown_ssrcs > 0,
ErrorKind::Invalid,
"Unknown SSRC {}",
ssrc
);
self.unknown_ssrcs -= 1;
let ssrc_context = SsrcContext {
replay_window_head: 0,
replay_window: FixedBitSet::with_capacity(128),
session_encr_key: vec![0; 128 / 8],
session_salt_key: vec![0; 112 / 8],
session_auth_key: vec![0; 160 / 8],
protocol_specific: P::default(),
};
self.ssrc_context.insert(ssrc, ssrc_context);
}
// Step 2: Determine index of the packet
let index = track_try!(P::determine_outgoing_packet_index(
@ -768,28 +809,26 @@ where
}
fn hmac_hash_sha1(key: &[u8], data: &[u8]) -> Vec<u8> {
use crypto::mac::Mac;
let mut hmac = crypto::hmac::Hmac::new(crypto::sha1::Sha1::new(), key);
hmac.input(data);
Vec::from(hmac.result().code())
let mut hmac = hmac::Hmac::<sha1::Sha1>::new_varkey(key).expect("Correct Key Length");
hmac.update(data);
hmac.finalize().into_bytes().to_vec()
}
fn prf_n(master_key: &[u8], x: BigUint, n: usize) -> Vec<u8> {
// https://tools.ietf.org/html/rfc3711#section-4.1.1
let mut output = Vec::new();
let mut ctr = crypto::aes::ctr(
crypto::aes::KeySize::KeySize128,
master_key,
&(x << 16).to_bytes_be(),
);
let mut ctr = aes_ctr::Aes128Ctr::new_var(master_key, &(x << 16).to_bytes_be())
.expect("Correct Key Length");
output.reserve_exact(n.next_multiple_of(&16));
for i in 0.. {
let old_len = output.len();
let new_len = output.len() + 16;
output.resize(new_len, 0);
let mut input = [0; 16];
(&mut input[8..]).write_u64be(i).unwrap();
ctr.process(&input[..], &mut output[old_len..]);
output.extend_from_slice(&[0u8; 8]);
output.extend_from_slice(&u64::to_be_bytes(i));
ctr.apply_keystream(&mut output[old_len..]);
if output.len() >= n {
break;
}
@ -799,10 +838,10 @@ fn prf_n(master_key: &[u8], x: BigUint, n: usize) -> Vec<u8> {
}
#[cfg(test)]
mod test {
pub(crate) mod test {
use super::*;
use rfc3550;
use rfc4585;
use crate::rfc3550;
use crate::rfc4585;
#[test]
fn rtp_packet_index_estimation_works() {
@ -825,14 +864,14 @@ mod test {
assert_eq!(estimate(&context, 10001), i(roc_p1, 10001)); // roc+1
}
const TEST_MASTER_KEY: &[u8] = &[
pub(crate) const TEST_MASTER_KEY: &[u8] = &[
211, 77, 116, 243, 125, 116, 231, 95, 59, 219, 79, 118, 241, 189, 244, 119,
];
const TEST_MASTER_SALT: &[u8] = &[
pub(crate) const TEST_MASTER_SALT: &[u8] = &[
127, 31, 227, 93, 120, 247, 126, 117, 231, 159, 123, 235, 95, 122,
];
const TEST_SRTP_SSRC: Ssrc = 446919554;
const TEST_SRTP_PACKET: &[u8] = &[
pub(crate) const TEST_SRTP_SSRC: Ssrc = 446919554;
pub(crate) const TEST_SRTP_PACKET: &[u8] = &[
128, 0, 3, 92, 222, 161, 6, 76, 26, 163, 115, 130, 222, 0, 143, 87, 0, 227, 123, 91, 200,
238, 141, 220, 9, 191, 52, 111, 100, 62, 220, 158, 211, 79, 184, 199, 79, 182, 9, 248, 170,
82, 125, 152, 143, 206, 8, 152, 80, 207, 27, 183, 141, 77, 33, 60, 101, 180, 210, 146, 139,
@ -844,8 +883,8 @@ mod test {
7, 52, 191, 129, 239, 86, 78, 172, 229, 178, 112, 22, 125, 191, 164, 17, 193, 24, 152, 197,
146, 94, 74, 156, 171, 245, 239, 220, 205, 145, 206,
];
const TEST_SRTCP_SSRC: Ssrc = 3270675037;
const TEST_SRTCP_PACKET: &[u8] = &[
pub(crate) const TEST_SRTCP_SSRC: Ssrc = 3270675037;
pub(crate) const TEST_SRTCP_PACKET: &[u8] = &[
128, 201, 0, 1, 194, 242, 138, 93, 177, 31, 99, 88, 187, 209, 173, 181, 135, 18, 79, 59,
119, 153, 115, 34, 75, 94, 96, 29, 32, 14, 118, 86, 145, 159, 203, 174, 225, 34, 196, 229,
39, 22, 174, 54, 198, 56, 179, 171, 111, 229, 48, 234, 138, 249, 127, 11, 86, 94, 40, 213,

View File

@ -1,12 +1,13 @@
use handy_async::sync_io::{ReadExt, WriteExt};
use std::io::{Read, Write};
use trackable::*;
use constants::RTP_VERSION;
use io::{ReadFrom, WriteTo};
use rfc3550;
use traits::{self, Packet};
use types::{Ssrc, U13, U5, U6, U7};
use {ErrorKind, Result};
use crate::constants::RTP_VERSION;
use crate::io::{ReadFrom, WriteTo};
use crate::rfc3550;
use crate::traits::{self, Packet};
use crate::types::{Ssrc, U13, U5, U6, U7};
use crate::{ErrorKind, Result};
pub const RTCP_PACKET_TYPE_RTPFB: u8 = 205;
pub const RTCP_PACKET_TYPE_PSFB: u8 = 206;

View File

@ -1,7 +1,8 @@
use std::io::{Read, Write};
use trackable::*;
use traits::{Packet, ReadPacket, RtcpPacket, RtpPacket, WritePacket};
use Result;
use crate::traits::{Packet, ReadPacket, RtcpPacket, RtpPacket, WritePacket};
use crate::Result;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MuxPacketReader<T, U> {

679
src/rfc5764/mod.rs Normal file
View File

@ -0,0 +1,679 @@
// FIXME: the current SRTP implementation does not support the maximum_lifetime parameter
#[cfg(feature = "rfc5764-openssl")]
mod openssl;
use async_trait::async_trait;
use futures::io::{AsyncRead, AsyncWrite};
use futures::ready;
use futures::{Sink, Stream};
use std::collections::VecDeque;
use std::io;
use std::io::Read;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::Mutex;
use std::task::Context;
use std::task::Poll;
use crate::rfc3711::{
AuthenticationAlgorithm, Context as SrtpContext, EncryptionAlgorithm, Srtcp, Srtp,
};
use crate::types::Ssrc;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SrtpProtectionProfile {
pub name: &'static str,
pub cipher: EncryptionAlgorithm,
pub cipher_key_length: u8,
pub cipher_salt_length: u8,
pub maximum_lifetime: u32,
pub auth_function: AuthenticationAlgorithm,
pub auth_key_length: u8,
pub auth_salt_length: u8,
}
impl SrtpProtectionProfile {
pub const AES128_CM_HMAC_SHA1_80: SrtpProtectionProfile = SrtpProtectionProfile {
name: "SRTP_AES128_CM_SHA1_80",
cipher: EncryptionAlgorithm::AesCm,
cipher_key_length: 128,
cipher_salt_length: 112,
maximum_lifetime: 2 ^ 31,
auth_function: AuthenticationAlgorithm::HmacSha1,
auth_key_length: 160,
auth_salt_length: 80,
};
// AES128_CM_HMAC_SHA1_32 is not supported due to recommendation in rfc3711#5.2
// NULL_HMAC_SHA1_80 is not supported because the NULL cipher isn't implemented
// NULL_HMAC_SHA1_32 is not supported due to recommendation in rfc3711#5.2 (and lack of NULL)
pub const RECOMMENDED: &'static [&'static SrtpProtectionProfile] =
&[&SrtpProtectionProfile::AES128_CM_HMAC_SHA1_80];
pub const SUPPORTED: &'static [&'static SrtpProtectionProfile] =
&[&SrtpProtectionProfile::AES128_CM_HMAC_SHA1_80];
}
#[async_trait]
pub trait DtlsBuilder<S> {
type Instance: Dtls<S>;
async fn handshake(self, stream: S) -> Result<Self::Instance, io::Error>
where
S: AsyncRead + AsyncWrite + Unpin + 'async_trait;
}
pub trait Dtls<S>: AsyncRead + AsyncWrite + Unpin {
fn is_server_side(&self) -> bool;
fn export_key(&mut self, exporter_label: &str, length: usize) -> Vec<u8>;
}
pub struct DtlsSrtpMuxer<S> {
inner: S,
dtls_buf: VecDeque<Vec<u8>>,
srtp_buf: VecDeque<Vec<u8>>,
}
impl<S: AsyncRead + AsyncWrite> DtlsSrtpMuxer<S> {
fn new(inner: S) -> Self {
DtlsSrtpMuxer {
inner,
dtls_buf: VecDeque::new(),
srtp_buf: VecDeque::new(),
}
}
}
impl<S> DtlsSrtpMuxer<S> {
fn into_parts(self) -> (DtlsSrtpMuxerPart<S>, DtlsSrtpMuxerPart<S>) {
let muxer = Arc::new(Mutex::new(self));
let dtls = DtlsSrtpMuxerPart {
muxer: muxer.clone(),
srtp: false,
};
let srtp = DtlsSrtpMuxerPart { muxer, srtp: true };
(dtls, srtp)
}
}
impl<S: AsyncRead + Unpin> DtlsSrtpMuxer<S> {
fn read(
&mut self,
cx: &mut Context,
want_srtp: bool,
dst_buf: &mut [u8],
) -> Poll<io::Result<usize>> {
{
let want_buf = if want_srtp {
&mut self.srtp_buf
} else {
&mut self.dtls_buf
};
if let Some(buf) = want_buf.pop_front() {
return Poll::Ready((&buf[..]).read(dst_buf));
}
}
let mut buf = [0u8; 2048];
let len = ready!(Pin::new(&mut self.inner).poll_read(cx, &mut buf))?;
if len == 0 {
return Poll::Ready(Ok(0));
}
let mut buf = &buf[..len];
// Demux SRTP and DTLS as per https://tools.ietf.org/html/rfc5764#section-5.1.2
let is_srtp = buf[0] >= 128 && buf[0] <= 191;
if is_srtp == want_srtp {
Poll::Ready(buf.read(dst_buf))
} else {
if is_srtp {
&mut self.srtp_buf
} else {
&mut self.dtls_buf
}
.push_back(buf.to_vec());
// We have to make sure we're not waiting for, e.g., a srtp packet when
// we just got a dtls packet and the remote is waiting on a reply to it.
// So, to prevent this kind of deadlock, we abort the current read-path
// by pretending that we're doing non-blocking io (even if we aren't)
// to get back to where we can enter the other (in the example: the dtls)
// read-path and process the packet we just read.
Poll::Pending // FIXME this doesn't see sound, shouldn't we store the waker!?
}
}
}
pub struct DtlsSrtpMuxerPart<S> {
muxer: Arc<Mutex<DtlsSrtpMuxer<S>>>,
srtp: bool,
}
impl<S> AsyncRead for DtlsSrtpMuxerPart<S>
where
S: AsyncRead + Unpin,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.muxer.lock().unwrap().read(cx, self.srtp, buf)
}
}
impl<S> AsyncWrite for DtlsSrtpMuxerPart<S>
where
S: AsyncWrite + Unpin,
{
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.muxer.lock().unwrap().inner).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.muxer.lock().unwrap().inner).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
ready!(self.as_mut().poll_flush(cx))?;
Pin::new(&mut self.muxer.lock().unwrap().inner).poll_close(cx)
}
}
pub struct DtlsSrtp<S: AsyncRead + AsyncWrite, D: DtlsBuilder<DtlsSrtpMuxerPart<S>>> {
stream: DtlsSrtpMuxerPart<S>,
#[allow(dead_code)] // we'll need this once we implement re-keying
dtls: D::Instance,
srtp_read_context: SrtpContext<Srtp>,
srtcp_read_context: SrtpContext<Srtcp>,
srtp_write_context: SrtpContext<Srtp>,
srtcp_write_context: SrtpContext<Srtcp>,
sink_buf: Option<Vec<u8>>,
}
impl<S, D> DtlsSrtp<S, D>
where
S: AsyncRead + AsyncWrite + Unpin,
D: DtlsBuilder<DtlsSrtpMuxerPart<S>>,
{
pub async fn handshake(stream: S, dtls_builder: D) -> Result<DtlsSrtp<S, D>, io::Error> {
let (stream_dtls, stream_srtp) = DtlsSrtpMuxer::new(stream).into_parts();
let dtls = dtls_builder.handshake(stream_dtls).await?;
Ok(DtlsSrtp::new(stream_srtp, dtls))
}
fn new(stream: DtlsSrtpMuxerPart<S>, mut dtls: D::Instance) -> Self {
const EXPORTER_LABEL: &str = "EXTRACTOR-dtls_srtp";
const KEY_LEN: usize = 16;
const SALT_LEN: usize = 14;
const EXPORT_LEN: usize = (KEY_LEN + SALT_LEN) * 2;
let key_material = dtls.export_key(EXPORTER_LABEL, EXPORT_LEN);
let client_material = (
&(&key_material[0..])[..KEY_LEN],
&(&key_material[KEY_LEN * 2..])[..SALT_LEN],
);
let server_material = (
&(&key_material[KEY_LEN..])[..KEY_LEN],
&(&key_material[KEY_LEN * 2 + SALT_LEN..])[..SALT_LEN],
);
let (read_material, write_material) = if dtls.is_server_side() {
(client_material, server_material)
} else {
(server_material, client_material)
};
let (read_key, read_salt) = read_material;
let (write_key, write_salt) = write_material;
DtlsSrtp {
stream,
dtls,
srtp_read_context: SrtpContext::new(&read_key, &read_salt),
srtcp_read_context: SrtpContext::new(&read_key, &read_salt),
srtp_write_context: SrtpContext::new(&write_key, &write_salt),
srtcp_write_context: SrtpContext::new(&write_key, &write_salt),
sink_buf: None,
}
}
pub fn add_incoming_ssrc(&mut self, ssrc: Ssrc) {
self.srtp_read_context.add_ssrc(ssrc);
self.srtcp_read_context.add_ssrc(ssrc);
}
pub fn add_incoming_unknown_ssrcs(&mut self, count: usize) {
self.srtp_read_context.add_unknown_ssrcs(count);
self.srtcp_read_context.add_unknown_ssrcs(count);
}
pub fn add_outgoing_ssrc(&mut self, ssrc: Ssrc) {
self.srtp_write_context.add_ssrc(ssrc);
self.srtcp_write_context.add_ssrc(ssrc);
}
pub fn add_outgoing_unknown_ssrcs(&mut self, count: usize) {
self.srtp_write_context.add_unknown_ssrcs(count);
self.srtcp_write_context.add_unknown_ssrcs(count);
}
fn process_incoming_srtp_packet(&mut self, buf: &[u8]) -> Option<Vec<u8>> {
// Demux SRTP and SRTCP packets as per https://tools.ietf.org/html/rfc5761#section-4
let payload_type = buf[1] & 0x7f;
if 64 <= payload_type && payload_type <= 95 {
self.srtcp_read_context.process_incoming(buf).ok()
} else {
self.srtp_read_context.process_incoming(buf).ok()
}
}
fn process_outgoing_srtp_packet(&mut self, buf: &[u8]) -> Option<Vec<u8>> {
// Demux SRTP and SRTCP packets as per https://tools.ietf.org/html/rfc5761#section-4
let payload_type = buf[1] & 0x7f;
if 64 <= payload_type && payload_type <= 95 {
self.srtcp_write_context.process_outgoing(buf).ok()
} else {
self.srtp_write_context.process_outgoing(buf).ok()
}
}
}
impl<S, D> AsyncRead for DtlsSrtp<S, D>
where
S: AsyncRead + AsyncWrite + Unpin,
D: DtlsBuilder<DtlsSrtpMuxerPart<S>>,
{
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
let item = ready!(self.poll_next(cx)?);
if let Some(item) = item {
Poll::Ready((&item[..]).read(buf))
} else {
Poll::Ready(Ok(0))
}
}
}
impl<S, D> Stream for DtlsSrtp<S, D>
where
S: AsyncRead + AsyncWrite + Unpin,
D: DtlsBuilder<DtlsSrtpMuxerPart<S>>,
{
type Item = io::Result<Vec<u8>>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
// Check if we have an SRTP packet in the queue
if self.stream.muxer.lock().unwrap().srtp_buf.is_empty() {
// if we don't, then poll the dtls layer which will read from the
// underlying packet stream and produce either dtls data or fill
// the SRTP packet queue or fail due to WouldBlock
/* FIXME polling dtls eventually errs with a read timeout, for some reason
* it does indeed send repeated "Change Cipher Spec" and "Encrypted Handshake
* Message" as if its expecting a response to those but none is sent by FF?
match self.dtls.read(buf) {
Ok(len) => return Ok(len),
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
// Either we're using non-blocking io and there's no more data
// available, or we received an SRTP packet which needs handling
}
err => return err,
};
*/
}
// Read and handle the next SRTP packet from the queue
let mut raw_buf = [0u8; 2048];
let len = ready!(Pin::new(&mut self.stream).poll_read(cx, &mut raw_buf))?;
if len == 0 {
return Poll::Ready(None);
}
let raw_buf = &raw_buf[..len];
return match self.process_incoming_srtp_packet(raw_buf) {
Some(result) => Poll::Ready(Some(Ok(result))),
None => {
// FIXME: check rfc for whether this should be dropped silently
continue; // packet failed to auth or decrypt, drop it and try the next one
}
};
}
}
}
impl<S, D> AsyncWrite for DtlsSrtp<S, D>
where
S: AsyncRead + AsyncWrite + Unpin,
D: DtlsBuilder<DtlsSrtpMuxerPart<S>>,
{
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
if let Some(buf) = self.process_outgoing_srtp_packet(buf) {
Pin::new(&mut self.stream).poll_write(cx, &buf)
} else {
Poll::Ready(Ok(buf.len()))
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.stream).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.stream).poll_close(cx)
}
}
impl<S, D> Sink<&[u8]> for DtlsSrtp<S, D>
where
S: AsyncRead + AsyncWrite + Unpin,
D: DtlsBuilder<DtlsSrtpMuxerPart<S>>,
{
type Error = io::Error;
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
let _ = Sink::poll_flush(self.as_mut(), cx)?;
if self.sink_buf.is_none() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
fn start_send(mut self: Pin<&mut Self>, item: &[u8]) -> io::Result<()> {
self.sink_buf = self.process_outgoing_srtp_packet(item.as_ref());
Ok(())
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
if let Some(buf) = self.sink_buf.take() {
match Pin::new(&mut self.stream).poll_write(cx, &buf) {
Poll::Pending => {
self.sink_buf = Some(buf);
return Poll::Pending;
}
_ => {}
}
}
Pin::new(&mut self.stream).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.stream).poll_close(cx)
}
}
#[cfg(test)]
pub(crate) mod test {
use super::*;
use crate::rfc3711::test::{
TEST_MASTER_KEY, TEST_MASTER_SALT, TEST_SRTCP_PACKET, TEST_SRTCP_SSRC, TEST_SRTP_PACKET,
TEST_SRTP_SSRC,
};
use futures::{AsyncReadExt, AsyncWriteExt, FutureExt};
macro_rules! read_now {
( $expr:expr, $buf:expr ) => {
$expr
.read($buf)
.now_or_never()
.expect("would block")
.expect("reading")
};
}
macro_rules! write_now {
( $expr:expr, $buf:expr ) => {
$expr
.write($buf)
.now_or_never()
.expect("would block")
.expect("writing")
};
}
struct DummyDtlsBuilder;
struct DummyDtls<S> {
connected: bool,
stream: S,
}
const DUMMY_DTLS_NOOP: &[u8] = &[20, 42];
const DUMMY_DTLS_HELLO: &[u8] = &[62, 42];
const DUMMY_DTLS_CONNECTED: &[u8] = &[63, 42];
impl DummyDtlsBuilder {
fn new() -> Self {
DummyDtlsBuilder {}
}
}
#[async_trait]
impl<S: AsyncRead + AsyncWrite + Unpin + Send> DtlsBuilder<S> for DummyDtlsBuilder {
type Instance = DummyDtls<S>;
async fn handshake(self, mut stream: S) -> Result<Self::Instance, io::Error>
where
S: 'async_trait,
{
let _ = stream.write(DUMMY_DTLS_HELLO).await;
let mut dtls = DummyDtls {
stream,
connected: false,
};
loop {
let _ = futures::poll!(dtls.read(&mut []));
if dtls.connected {
break;
} else {
futures::pending!();
}
}
Ok(dtls)
}
}
impl<S: AsyncRead + AsyncWrite + Unpin> Dtls<S> for DummyDtls<S> {
fn is_server_side(&self) -> bool {
true
}
fn export_key(&mut self, exporter_label: &str, length: usize) -> Vec<u8> {
assert_eq!(exporter_label, "EXTRACTOR-dtls_srtp");
let mut buf = Vec::new();
buf.extend(TEST_MASTER_KEY);
buf.extend(TEST_MASTER_KEY);
let idx = buf.len() - 1;
buf[idx] = 0;
buf.extend(TEST_MASTER_SALT);
buf.extend(TEST_MASTER_SALT);
let idx = buf.len() - 1;
buf[idx] = 0;
assert_eq!(length, buf.len());
buf
}
}
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for DummyDtls<S> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context,
_dst: &mut [u8],
) -> Poll<io::Result<usize>> {
loop {
let mut buf = [0u8; 2048];
let len = ready!(Pin::new(&mut self.stream).poll_read(cx, &mut buf))?;
assert_eq!(len, 2);
assert_eq!(buf[1], 42);
match &buf[..len] {
DUMMY_DTLS_NOOP => {}
DUMMY_DTLS_HELLO => {
let _ = Pin::new(&mut self.stream).poll_write(cx, DUMMY_DTLS_CONNECTED)?;
}
DUMMY_DTLS_CONNECTED => {
self.connected = true;
}
_ => panic!(),
};
}
}
}
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for DummyDtls<S> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.stream).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.stream).poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
Pin::new(&mut self.stream).poll_flush(cx)
}
}
type PacketBufArc = Arc<Mutex<VecDeque<Vec<u8>>>>;
pub(crate) struct DummyTransport {
read_buf: PacketBufArc,
write_buf: PacketBufArc,
}
impl DummyTransport {
pub fn new() -> (Self, Self) {
let read_buf = Arc::new(Mutex::new(VecDeque::new()));
let write_buf = Arc::new(Mutex::new(VecDeque::new()));
(
DummyTransport {
read_buf: read_buf.clone(),
write_buf: write_buf.clone(),
},
DummyTransport {
read_buf: write_buf.clone(),
write_buf: read_buf.clone(),
},
)
}
pub fn read_packet(&mut self) -> Option<Vec<u8>> {
self.read_buf.lock().unwrap().pop_front()
}
}
impl AsyncRead for DummyTransport {
fn poll_read(
self: Pin<&mut Self>,
_cx: &mut Context,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
match self.read_buf.lock().unwrap().pop_front() {
None => Poll::Pending,
Some(elem) => Poll::Ready(std::io::Read::read(&mut &elem[..], buf)),
}
}
}
impl AsyncWrite for DummyTransport {
fn poll_write(
self: Pin<&mut Self>,
_cx: &mut Context,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.write_buf.lock().unwrap().push_back(buf.to_vec());
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}
fn new_dtls_srtp() -> (DummyTransport, DtlsSrtp<DummyTransport, DummyDtlsBuilder>) {
let (mut stream, dummy_stream) = DummyTransport::new();
write_now!(stream, DUMMY_DTLS_CONNECTED);
let mut dtls_srtp = DtlsSrtp::handshake(dummy_stream, DummyDtlsBuilder::new())
.now_or_never()
.expect("DTLS-SRTP handshake did not complete")
.expect("DTL-SRTP handshake failed");
assert_eq!(&stream.read_packet().unwrap()[..], DUMMY_DTLS_HELLO);
dtls_srtp.add_incoming_ssrc(TEST_SRTP_SSRC);
dtls_srtp.add_incoming_ssrc(TEST_SRTCP_SSRC);
dtls_srtp.add_outgoing_ssrc(TEST_SRTP_SSRC);
dtls_srtp.add_outgoing_ssrc(TEST_SRTCP_SSRC);
(stream, dtls_srtp)
}
#[test]
fn polls_dtls_layer_for_keys() {
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
let (mut stream, dummy_stream) = DummyTransport::new();
let mut handshake = DtlsSrtp::handshake(dummy_stream, DummyDtlsBuilder::new()).boxed();
match handshake.as_mut().poll(&mut cx) {
Poll::Pending => {}
_ => panic!(),
};
// too early, should be discarded
write_now!(stream, TEST_SRTP_PACKET);
match handshake.as_mut().poll(&mut cx) {
Poll::Pending => {}
_ => panic!(),
};
assert_eq!(&stream.read_packet().unwrap()[..], DUMMY_DTLS_HELLO);
write_now!(stream, DUMMY_DTLS_HELLO);
match handshake.as_mut().poll(&mut cx) {
Poll::Pending => {}
_ => panic!(),
};
assert_eq!(&stream.read_packet().unwrap()[..], DUMMY_DTLS_CONNECTED);
write_now!(stream, DUMMY_DTLS_CONNECTED);
match handshake.as_mut().poll(&mut cx) {
Poll::Ready(_) => {}
_ => panic!(),
};
}
#[test]
fn decryption_of_incoming_srtp_and_srtcp_works() {
let mut buf = [0u8; 2048];
let (mut stream, mut dtls_srtp) = new_dtls_srtp();
write_now!(stream, TEST_SRTP_PACKET);
write_now!(stream, TEST_SRTCP_PACKET);
assert_eq!(read_now!(dtls_srtp, &mut buf), 182); // srtp
assert_eq!(read_now!(dtls_srtp, &mut buf), 68); // srtcp
}
#[test]
fn does_not_allow_replay_of_packets() {
let mut buf = [0u8; 2048];
let (mut stream, mut dtls_srtp) = new_dtls_srtp();
write_now!(stream, TEST_SRTP_PACKET);
write_now!(stream, TEST_SRTP_PACKET);
write_now!(stream, TEST_SRTP_PACKET);
assert_eq!(read_now!(dtls_srtp, &mut buf), 182);
assert!(dtls_srtp.read(&mut buf).now_or_never().is_none(),);
write_now!(stream, TEST_SRTCP_PACKET);
write_now!(stream, TEST_SRTCP_PACKET);
write_now!(stream, TEST_SRTCP_PACKET);
assert_eq!(read_now!(dtls_srtp, &mut buf), 68);
assert!(dtls_srtp.read(&mut buf).now_or_never().is_none(),);
}
}

138
src/rfc5764/openssl.rs Normal file
View File

@ -0,0 +1,138 @@
use async_trait::async_trait;
use futures::io::{AsyncRead, AsyncWrite};
use std::io;
use tokio_openssl::SslStream;
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, Tokio02AsyncReadCompatExt};
use openssl::ssl::{ConnectConfiguration, SslAcceptorBuilder};
use crate::rfc5764::{Dtls, DtlsBuilder, SrtpProtectionProfile};
type CompatSslStream<S> = Compat<SslStream<Compat<S>>>;
#[async_trait]
impl<S: AsyncRead + AsyncWrite + Send + Unpin> DtlsBuilder<S> for ConnectConfiguration {
type Instance = CompatSslStream<S>;
async fn handshake(mut self, stream: S) -> Result<Self::Instance, io::Error>
where
S: 'async_trait,
{
let profiles_str: String = SrtpProtectionProfile::RECOMMENDED
.iter()
.map(|profile| profile.name.to_string())
.collect::<Vec<_>>()
.join(":");
self.set_tlsext_use_srtp(&profiles_str).unwrap();
match tokio_openssl::connect(self, "invalid", stream.compat()).await {
Ok(stream) => Ok(stream.compat()),
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "handshake error")),
}
}
}
#[async_trait]
impl<S: AsyncRead + AsyncWrite + Send + Unpin> DtlsBuilder<S> for SslAcceptorBuilder {
type Instance = CompatSslStream<S>;
async fn handshake(mut self, stream: S) -> Result<Self::Instance, io::Error>
where
S: 'async_trait,
{
let profiles_str: String = SrtpProtectionProfile::RECOMMENDED
.iter()
.map(|profile| profile.name.to_string())
.collect::<Vec<_>>()
.join(":");
self.set_tlsext_use_srtp(&profiles_str).unwrap();
match tokio_openssl::accept(&self.build(), stream.compat()).await {
Ok(stream) => Ok(stream.compat()),
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "handshake error")),
}
}
}
impl<S: AsyncRead + AsyncWrite + Unpin> Dtls<S> for Compat<SslStream<Compat<S>>> {
fn is_server_side(&self) -> bool {
self.get_ref().ssl().is_server()
}
fn export_key(&mut self, exporter_label: &str, length: usize) -> Vec<u8> {
let mut vec = vec![0; length];
self.get_mut()
.ssl()
.export_keying_material(&mut vec, exporter_label, None)
.unwrap();
vec
}
}
#[cfg(test)]
mod test {
use crate::rfc5764::test::DummyTransport;
use crate::rfc5764::DtlsSrtp;
use futures::FutureExt;
use openssl::asn1::Asn1Time;
use openssl::hash::MessageDigest;
use openssl::pkey::PKey;
use openssl::rsa::Rsa;
use openssl::ssl::{SslAcceptor, SslConnector, SslMethod, SslVerifyMode};
use openssl::x509::X509;
use std::task::{Context, Poll};
#[test]
fn connect_and_establish_matching_key_material() {
let (client_sock, server_sock) = DummyTransport::new();
let rsa = Rsa::generate(2048).unwrap();
let key = PKey::from_rsa(rsa).unwrap();
let mut cert_builder = X509::builder().unwrap();
cert_builder
.set_not_after(&Asn1Time::days_from_now(1).unwrap())
.unwrap();
cert_builder
.set_not_before(&Asn1Time::days_from_now(0).unwrap())
.unwrap();
cert_builder.set_pubkey(&key).unwrap();
cert_builder.sign(&key, MessageDigest::sha256()).unwrap();
let cert = cert_builder.build();
let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::dtls()).unwrap();
let mut connector = SslConnector::builder(SslMethod::dtls()).unwrap();
acceptor.set_certificate(&cert).unwrap();
acceptor.set_private_key(&key).unwrap();
acceptor.set_verify(SslVerifyMode::NONE);
connector.set_verify(SslVerifyMode::NONE);
let handshake_server = DtlsSrtp::handshake(server_sock, acceptor);
let handshake_client =
DtlsSrtp::handshake(client_sock, connector.build().configure().unwrap());
let mut future = futures::future::join(handshake_server, handshake_client).boxed();
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
loop {
if let Poll::Ready((server, client)) = future.as_mut().poll(&mut cx) {
let server = server.unwrap();
let client = client.unwrap();
assert_eq!(
client.srtp_read_context.master_key,
server.srtp_write_context.master_key
);
assert_eq!(
client.srtp_read_context.master_salt,
server.srtp_write_context.master_salt
);
assert_eq!(
client.srtp_write_context.master_key,
server.srtp_read_context.master_key
);
assert_eq!(
client.srtp_write_context.master_salt,
server.srtp_read_context.master_salt
);
return;
}
}
}
}

View File

@ -1,6 +1,6 @@
use std::io::{Read, Write};
use Result;
use crate::Result;
pub trait Packet {}