Compare commits
10 Commits
f8097adbae
...
1876a53a94
Author | SHA1 | Date |
---|---|---|
|
1876a53a94 | |
|
6c0223d8a4 | |
|
3f56dd92c8 | |
|
2fe70b3900 | |
|
1444b3c063 | |
|
3510417ce4 | |
|
ee8be93855 | |
|
a2cfaf2d3e | |
|
0a8ec94f7d | |
|
6e21908845 |
23
Cargo.toml
23
Cargo.toml
|
@ -2,15 +2,28 @@
|
|||
name = "rtp"
|
||||
version = "0.1.0"
|
||||
authors = ["Takeru Ohta <phjgt308@gmail.com>"]
|
||||
edition = "2018"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
rfc5764-openssl = ["openssl", "tokio-openssl", "tokio-util/compat"]
|
||||
|
||||
[dependencies]
|
||||
trackable = "0.1"
|
||||
handy_async = "0.2"
|
||||
rust-crypto = "0.2"
|
||||
num = "0.1"
|
||||
aes-ctr = "0.6.0"
|
||||
async-trait = "0.1"
|
||||
fixedbitset = "0.1"
|
||||
futures = "0.3"
|
||||
handy_async = "0.2"
|
||||
hmac = "0.10.1"
|
||||
num = "0.1"
|
||||
sha-1 = "0.9.8"
|
||||
trackable = "0.1"
|
||||
|
||||
openssl = { version = "0.10", optional = true }
|
||||
tokio-openssl = { version = "0.4", optional = true }
|
||||
tokio-util = { version = "0.3", optional = true, default-features = false }
|
||||
|
||||
[dev-dependencies]
|
||||
clap = "2"
|
||||
fibers = "0.1"
|
||||
futures = "0.1"
|
||||
futures01 = { package = "futures", version = "0.1" }
|
||||
|
|
|
@ -12,3 +12,4 @@ RFC
|
|||
- AVPF: https://tools.ietf.org/html/rfc4585
|
||||
- SAVPF: https://tools.ietf.org/html/rfc5124
|
||||
- Multiplexing RTP and RTCP: https://tools.ietf.org/html/rfc5761
|
||||
- DTLS-SRTP: https://tools.ietf.org/html/rfc5764
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
extern crate clap;
|
||||
extern crate fibers;
|
||||
extern crate futures;
|
||||
extern crate futures01 as futures;
|
||||
#[macro_use]
|
||||
extern crate trackable;
|
||||
extern crate rtp;
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use std::io::{Read, Write};
|
||||
use trackable::*;
|
||||
|
||||
use Result;
|
||||
use crate::Result;
|
||||
|
||||
pub trait ReadFrom: Sized {
|
||||
fn read_from<R: Read>(reader: &mut R) -> Result<Self>;
|
||||
|
|
13
src/lib.rs
13
src/lib.rs
|
@ -1,17 +1,16 @@
|
|||
#[macro_use]
|
||||
extern crate trackable;
|
||||
extern crate crypto;
|
||||
extern crate handy_async;
|
||||
extern crate num;
|
||||
extern crate fixedbitset;
|
||||
#[cfg(feature = "openssl")]
|
||||
extern crate openssl;
|
||||
#[cfg(feature = "tokio")]
|
||||
extern crate tokio;
|
||||
|
||||
pub use error::{Error, ErrorKind};
|
||||
pub use crate::error::{Error, ErrorKind};
|
||||
|
||||
pub mod io;
|
||||
pub mod rfc3550;
|
||||
pub mod rfc3711;
|
||||
pub mod rfc4585;
|
||||
pub mod rfc5761;
|
||||
pub mod rfc5764;
|
||||
pub mod traits;
|
||||
|
||||
mod error;
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
use handy_async::sync_io::{ReadExt, WriteExt};
|
||||
use std::io::{Read, Write};
|
||||
use trackable::*;
|
||||
|
||||
use constants::RTP_VERSION;
|
||||
use io::{ReadFrom, WriteTo};
|
||||
use traits::{self, Packet};
|
||||
use types::{NtpMiddleTimetamp, NtpTimestamp, RtpTimestamp, Ssrc, SsrcOrCsrc, U24, U5};
|
||||
use {ErrorKind, Result};
|
||||
use crate::constants::RTP_VERSION;
|
||||
use crate::io::{ReadFrom, WriteTo};
|
||||
use crate::traits::{self, Packet};
|
||||
use crate::types::{NtpMiddleTimetamp, NtpTimestamp, RtpTimestamp, Ssrc, SsrcOrCsrc, U24, U5};
|
||||
use crate::{ErrorKind, Result};
|
||||
|
||||
pub const RTCP_PACKET_TYPE_SR: u8 = 200;
|
||||
pub const RTCP_PACKET_TYPE_RR: u8 = 201;
|
||||
|
@ -510,10 +511,10 @@ impl WriteTo for SdesChunk {
|
|||
write_bytes += 1;
|
||||
|
||||
let text = item.text();
|
||||
track_assert!(text.len() <= 0xFFFF, ErrorKind::Invalid);
|
||||
track_try!(writer.write_u16be(text.len() as u16));
|
||||
track_assert!(text.len() <= 0xFF, ErrorKind::Invalid);
|
||||
track_try!(writer.write_u8(text.len() as u8));
|
||||
track_try!(writer.write_all(text.as_bytes()));
|
||||
write_bytes += 2 + text.len();
|
||||
write_bytes += 1 + text.len();
|
||||
}
|
||||
track_try!(writer.write_u8(SDES_ITEM_TYPE_END));
|
||||
write_bytes += 1;
|
||||
|
|
|
@ -1,11 +1,12 @@
|
|||
use handy_async::sync_io::{ReadExt, WriteExt};
|
||||
use std::io::{Read, Write};
|
||||
use trackable::*;
|
||||
|
||||
use constants::RTP_VERSION;
|
||||
use io::{ReadFrom, WriteTo};
|
||||
use traits::{self, Packet};
|
||||
use types::{Csrc, RtpTimestamp, Ssrc, U7};
|
||||
use {ErrorKind, Result};
|
||||
use crate::constants::RTP_VERSION;
|
||||
use crate::io::{ReadFrom, WriteTo};
|
||||
use crate::traits::{self, Packet};
|
||||
use crate::types::{Csrc, RtpTimestamp, Ssrc, U7};
|
||||
use crate::{ErrorKind, Result};
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct RtpPacketReader;
|
||||
|
|
135
src/rfc3711.rs
135
src/rfc3711.rs
|
@ -1,16 +1,19 @@
|
|||
use crypto;
|
||||
// FIXME: saveguard against two-time pad by running replay-protection on outgoing packets
|
||||
use fixedbitset::FixedBitSet;
|
||||
use handy_async::sync_io::{ReadExt, WriteExt};
|
||||
use num::BigUint;
|
||||
use num::{BigUint, Integer};
|
||||
use std::borrow::Cow;
|
||||
use std::collections::BTreeMap;
|
||||
use std::io::{Read, Write};
|
||||
use trackable::*;
|
||||
|
||||
use io::{ReadFrom, WriteTo};
|
||||
use rfc3550;
|
||||
use traits::{ReadPacket, RtcpPacket, RtpPacket, WritePacket};
|
||||
use types::{Ssrc, U48};
|
||||
use {ErrorKind, Result};
|
||||
use crate::io::{ReadFrom, WriteTo};
|
||||
use crate::rfc3550;
|
||||
use crate::traits::{ReadPacket, RtcpPacket, RtpPacket, WritePacket};
|
||||
use crate::types::{Ssrc, U48};
|
||||
use crate::{ErrorKind, Result};
|
||||
use aes_ctr::cipher::{NewStreamCipher, SyncStreamCipher};
|
||||
use hmac::{Mac, NewMac};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum EncryptionAlgorithm {
|
||||
|
@ -85,6 +88,7 @@ pub struct Context<P: Protocol> {
|
|||
pub encryption: EncryptionAlgorithm,
|
||||
pub authentication: AuthenticationAlgorithm,
|
||||
pub auth_tag_len: usize,
|
||||
pub unknown_ssrcs: usize,
|
||||
pub ssrc_context: BTreeMap<u32, SsrcContext<P>>,
|
||||
}
|
||||
|
||||
|
@ -357,6 +361,7 @@ impl Protocol for Srtcp {
|
|||
impl<P: Protocol> Context<P>
|
||||
where
|
||||
u64: From<P::PacketIndex>,
|
||||
P::PacketIndex: Into<BigUint>,
|
||||
{
|
||||
pub fn new(master_key: &[u8], master_salt: &[u8]) -> Self {
|
||||
Context {
|
||||
|
@ -366,6 +371,7 @@ where
|
|||
encryption: EncryptionAlgorithm::default(),
|
||||
authentication: AuthenticationAlgorithm::default(),
|
||||
auth_tag_len: 80 / 8,
|
||||
unknown_ssrcs: 0,
|
||||
ssrc_context: BTreeMap::new(),
|
||||
}
|
||||
}
|
||||
|
@ -386,6 +392,10 @@ where
|
|||
);
|
||||
}
|
||||
|
||||
pub fn add_unknown_ssrcs(&mut self, count: usize) {
|
||||
self.unknown_ssrcs += count;
|
||||
}
|
||||
|
||||
pub fn update_session_keys(&mut self, ssrc: Ssrc, index: P::PacketIndex) {
|
||||
let index = if self.key_derivation_rate == 0 {
|
||||
0
|
||||
|
@ -464,19 +474,18 @@ where
|
|||
let iv = BigUint::from_bytes_be(&context.session_salt_key) << 16;
|
||||
let iv = iv ^ (BigUint::from(ssrc) << 64);
|
||||
let iv = iv ^ (index.into() << 16);
|
||||
let iv = &iv.to_bytes_be()[0..context.session_encr_key.len()];
|
||||
let iv = iv ^ (BigUint::from(1_u8) << (context.session_encr_key.len() * 8));
|
||||
let iv = &iv.to_bytes_be()[1..context.session_encr_key.len() + 1];
|
||||
|
||||
let mut ctr =
|
||||
aes_ctr::Aes128Ctr::new_var(&context.session_encr_key, iv).expect("Correct Key Length");
|
||||
|
||||
let mut ctr = crypto::aes::ctr(
|
||||
crypto::aes::KeySize::KeySize128,
|
||||
&context.session_encr_key,
|
||||
iv,
|
||||
);
|
||||
let block_size = context.session_encr_key.len();
|
||||
|
||||
for block in encrypted.chunks(block_size) {
|
||||
let old_len = decrypted.len();
|
||||
decrypted.resize(old_len + block.len(), 0);
|
||||
ctr.process(block, &mut decrypted[old_len..]);
|
||||
decrypted.extend_from_slice(block);
|
||||
ctr.apply_keystream(&mut decrypted[old_len..]);
|
||||
}
|
||||
}
|
||||
pub fn decrypt(
|
||||
|
@ -499,19 +508,17 @@ where
|
|||
let iv = BigUint::from_bytes_be(&context.session_salt_key) << 16;
|
||||
let iv = iv ^ (BigUint::from(ssrc) << 64);
|
||||
let iv = iv ^ (index.into() << 16);
|
||||
let iv = &iv.to_bytes_be()[0..context.session_encr_key.len()];
|
||||
let iv = iv ^ (BigUint::from(1_u8) << (context.session_encr_key.len() * 8));
|
||||
let iv = &iv.to_bytes_be()[1..context.session_encr_key.len() + 1];
|
||||
|
||||
let mut ctr = crypto::aes::ctr(
|
||||
crypto::aes::KeySize::KeySize128,
|
||||
&context.session_encr_key,
|
||||
iv,
|
||||
);
|
||||
let mut ctr =
|
||||
aes_ctr::Aes128Ctr::new_var(&context.session_encr_key, iv).expect("Correct Key Length");
|
||||
let block_size = context.session_encr_key.len();
|
||||
|
||||
for block in plaintext.chunks(block_size) {
|
||||
let old_len = encrypted.len();
|
||||
encrypted.resize(old_len + block.len(), 0);
|
||||
ctr.process(block, &mut encrypted[old_len..]);
|
||||
encrypted.extend_from_slice(block);
|
||||
ctr.apply_keystream(&mut encrypted[old_len..]);
|
||||
}
|
||||
}
|
||||
pub fn encrypt(
|
||||
|
@ -528,7 +535,24 @@ where
|
|||
pub fn process_incoming(&mut self, packet: &[u8]) -> Result<Vec<u8>> {
|
||||
// Step 1: determining the correct context
|
||||
let ssrc = track_try!(P::read_ssrc(packet));
|
||||
track_assert!(self.ssrc_context.contains_key(&ssrc), ErrorKind::Invalid, "Unknown SSRC {}", ssrc);
|
||||
if !self.ssrc_context.contains_key(&ssrc) {
|
||||
track_assert!(
|
||||
self.unknown_ssrcs > 0,
|
||||
ErrorKind::Invalid,
|
||||
"Unknown SSRC {}",
|
||||
ssrc
|
||||
);
|
||||
self.unknown_ssrcs -= 1;
|
||||
let ssrc_context = SsrcContext {
|
||||
replay_window_head: 0,
|
||||
replay_window: FixedBitSet::with_capacity(128),
|
||||
session_encr_key: vec![0; 128 / 8],
|
||||
session_salt_key: vec![0; 112 / 8],
|
||||
session_auth_key: vec![0; 160 / 8],
|
||||
protocol_specific: P::default(),
|
||||
};
|
||||
self.ssrc_context.insert(ssrc, ssrc_context);
|
||||
}
|
||||
|
||||
// Step 2: Determine index of the packet
|
||||
let index = track_try!(P::determine_incoming_packet_index(
|
||||
|
@ -600,7 +624,24 @@ where
|
|||
pub fn process_outgoing(&mut self, packet: &[u8]) -> Result<Vec<u8>> {
|
||||
// Step 1: determining the correct context
|
||||
let ssrc = track_try!(P::read_ssrc(packet));
|
||||
track_assert!(self.ssrc_context.contains_key(&ssrc), ErrorKind::Invalid, "Unknown SSRC {}", ssrc);
|
||||
if !self.ssrc_context.contains_key(&ssrc) {
|
||||
track_assert!(
|
||||
self.unknown_ssrcs > 0,
|
||||
ErrorKind::Invalid,
|
||||
"Unknown SSRC {}",
|
||||
ssrc
|
||||
);
|
||||
self.unknown_ssrcs -= 1;
|
||||
let ssrc_context = SsrcContext {
|
||||
replay_window_head: 0,
|
||||
replay_window: FixedBitSet::with_capacity(128),
|
||||
session_encr_key: vec![0; 128 / 8],
|
||||
session_salt_key: vec![0; 112 / 8],
|
||||
session_auth_key: vec![0; 160 / 8],
|
||||
protocol_specific: P::default(),
|
||||
};
|
||||
self.ssrc_context.insert(ssrc, ssrc_context);
|
||||
}
|
||||
|
||||
// Step 2: Determine index of the packet
|
||||
let index = track_try!(P::determine_outgoing_packet_index(
|
||||
|
@ -768,28 +809,26 @@ where
|
|||
}
|
||||
|
||||
fn hmac_hash_sha1(key: &[u8], data: &[u8]) -> Vec<u8> {
|
||||
use crypto::mac::Mac;
|
||||
let mut hmac = crypto::hmac::Hmac::new(crypto::sha1::Sha1::new(), key);
|
||||
hmac.input(data);
|
||||
Vec::from(hmac.result().code())
|
||||
let mut hmac = hmac::Hmac::<sha1::Sha1>::new_varkey(key).expect("Correct Key Length");
|
||||
hmac.update(data);
|
||||
hmac.finalize().into_bytes().to_vec()
|
||||
}
|
||||
|
||||
fn prf_n(master_key: &[u8], x: BigUint, n: usize) -> Vec<u8> {
|
||||
// https://tools.ietf.org/html/rfc3711#section-4.1.1
|
||||
let mut output = Vec::new();
|
||||
let mut ctr = crypto::aes::ctr(
|
||||
crypto::aes::KeySize::KeySize128,
|
||||
master_key,
|
||||
&(x << 16).to_bytes_be(),
|
||||
);
|
||||
let mut ctr = aes_ctr::Aes128Ctr::new_var(master_key, &(x << 16).to_bytes_be())
|
||||
.expect("Correct Key Length");
|
||||
|
||||
output.reserve_exact(n.next_multiple_of(&16));
|
||||
|
||||
for i in 0.. {
|
||||
let old_len = output.len();
|
||||
let new_len = output.len() + 16;
|
||||
output.resize(new_len, 0);
|
||||
|
||||
let mut input = [0; 16];
|
||||
(&mut input[8..]).write_u64be(i).unwrap();
|
||||
ctr.process(&input[..], &mut output[old_len..]);
|
||||
output.extend_from_slice(&[0u8; 8]);
|
||||
output.extend_from_slice(&u64::to_be_bytes(i));
|
||||
|
||||
ctr.apply_keystream(&mut output[old_len..]);
|
||||
if output.len() >= n {
|
||||
break;
|
||||
}
|
||||
|
@ -799,10 +838,10 @@ fn prf_n(master_key: &[u8], x: BigUint, n: usize) -> Vec<u8> {
|
|||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
pub(crate) mod test {
|
||||
use super::*;
|
||||
use rfc3550;
|
||||
use rfc4585;
|
||||
use crate::rfc3550;
|
||||
use crate::rfc4585;
|
||||
|
||||
#[test]
|
||||
fn rtp_packet_index_estimation_works() {
|
||||
|
@ -825,14 +864,14 @@ mod test {
|
|||
assert_eq!(estimate(&context, 10001), i(roc_p1, 10001)); // roc+1
|
||||
}
|
||||
|
||||
const TEST_MASTER_KEY: &[u8] = &[
|
||||
pub(crate) const TEST_MASTER_KEY: &[u8] = &[
|
||||
211, 77, 116, 243, 125, 116, 231, 95, 59, 219, 79, 118, 241, 189, 244, 119,
|
||||
];
|
||||
const TEST_MASTER_SALT: &[u8] = &[
|
||||
pub(crate) const TEST_MASTER_SALT: &[u8] = &[
|
||||
127, 31, 227, 93, 120, 247, 126, 117, 231, 159, 123, 235, 95, 122,
|
||||
];
|
||||
const TEST_SRTP_SSRC: Ssrc = 446919554;
|
||||
const TEST_SRTP_PACKET: &[u8] = &[
|
||||
pub(crate) const TEST_SRTP_SSRC: Ssrc = 446919554;
|
||||
pub(crate) const TEST_SRTP_PACKET: &[u8] = &[
|
||||
128, 0, 3, 92, 222, 161, 6, 76, 26, 163, 115, 130, 222, 0, 143, 87, 0, 227, 123, 91, 200,
|
||||
238, 141, 220, 9, 191, 52, 111, 100, 62, 220, 158, 211, 79, 184, 199, 79, 182, 9, 248, 170,
|
||||
82, 125, 152, 143, 206, 8, 152, 80, 207, 27, 183, 141, 77, 33, 60, 101, 180, 210, 146, 139,
|
||||
|
@ -844,8 +883,8 @@ mod test {
|
|||
7, 52, 191, 129, 239, 86, 78, 172, 229, 178, 112, 22, 125, 191, 164, 17, 193, 24, 152, 197,
|
||||
146, 94, 74, 156, 171, 245, 239, 220, 205, 145, 206,
|
||||
];
|
||||
const TEST_SRTCP_SSRC: Ssrc = 3270675037;
|
||||
const TEST_SRTCP_PACKET: &[u8] = &[
|
||||
pub(crate) const TEST_SRTCP_SSRC: Ssrc = 3270675037;
|
||||
pub(crate) const TEST_SRTCP_PACKET: &[u8] = &[
|
||||
128, 201, 0, 1, 194, 242, 138, 93, 177, 31, 99, 88, 187, 209, 173, 181, 135, 18, 79, 59,
|
||||
119, 153, 115, 34, 75, 94, 96, 29, 32, 14, 118, 86, 145, 159, 203, 174, 225, 34, 196, 229,
|
||||
39, 22, 174, 54, 198, 56, 179, 171, 111, 229, 48, 234, 138, 249, 127, 11, 86, 94, 40, 213,
|
||||
|
|
|
@ -1,12 +1,13 @@
|
|||
use handy_async::sync_io::{ReadExt, WriteExt};
|
||||
use std::io::{Read, Write};
|
||||
use trackable::*;
|
||||
|
||||
use constants::RTP_VERSION;
|
||||
use io::{ReadFrom, WriteTo};
|
||||
use rfc3550;
|
||||
use traits::{self, Packet};
|
||||
use types::{Ssrc, U13, U5, U6, U7};
|
||||
use {ErrorKind, Result};
|
||||
use crate::constants::RTP_VERSION;
|
||||
use crate::io::{ReadFrom, WriteTo};
|
||||
use crate::rfc3550;
|
||||
use crate::traits::{self, Packet};
|
||||
use crate::types::{Ssrc, U13, U5, U6, U7};
|
||||
use crate::{ErrorKind, Result};
|
||||
|
||||
pub const RTCP_PACKET_TYPE_RTPFB: u8 = 205;
|
||||
pub const RTCP_PACKET_TYPE_PSFB: u8 = 206;
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use std::io::{Read, Write};
|
||||
use trackable::*;
|
||||
|
||||
use traits::{Packet, ReadPacket, RtcpPacket, RtpPacket, WritePacket};
|
||||
use Result;
|
||||
use crate::traits::{Packet, ReadPacket, RtcpPacket, RtpPacket, WritePacket};
|
||||
use crate::Result;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct MuxPacketReader<T, U> {
|
||||
|
|
|
@ -0,0 +1,679 @@
|
|||
// FIXME: the current SRTP implementation does not support the maximum_lifetime parameter
|
||||
|
||||
#[cfg(feature = "rfc5764-openssl")]
|
||||
mod openssl;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::io::{AsyncRead, AsyncWrite};
|
||||
use futures::ready;
|
||||
use futures::{Sink, Stream};
|
||||
use std::collections::VecDeque;
|
||||
use std::io;
|
||||
use std::io::Read;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::sync::Mutex;
|
||||
use std::task::Context;
|
||||
use std::task::Poll;
|
||||
|
||||
use crate::rfc3711::{
|
||||
AuthenticationAlgorithm, Context as SrtpContext, EncryptionAlgorithm, Srtcp, Srtp,
|
||||
};
|
||||
use crate::types::Ssrc;
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct SrtpProtectionProfile {
|
||||
pub name: &'static str,
|
||||
pub cipher: EncryptionAlgorithm,
|
||||
pub cipher_key_length: u8,
|
||||
pub cipher_salt_length: u8,
|
||||
pub maximum_lifetime: u32,
|
||||
pub auth_function: AuthenticationAlgorithm,
|
||||
pub auth_key_length: u8,
|
||||
pub auth_salt_length: u8,
|
||||
}
|
||||
|
||||
impl SrtpProtectionProfile {
|
||||
pub const AES128_CM_HMAC_SHA1_80: SrtpProtectionProfile = SrtpProtectionProfile {
|
||||
name: "SRTP_AES128_CM_SHA1_80",
|
||||
cipher: EncryptionAlgorithm::AesCm,
|
||||
cipher_key_length: 128,
|
||||
cipher_salt_length: 112,
|
||||
maximum_lifetime: 2 ^ 31,
|
||||
auth_function: AuthenticationAlgorithm::HmacSha1,
|
||||
auth_key_length: 160,
|
||||
auth_salt_length: 80,
|
||||
};
|
||||
// AES128_CM_HMAC_SHA1_32 is not supported due to recommendation in rfc3711#5.2
|
||||
// NULL_HMAC_SHA1_80 is not supported because the NULL cipher isn't implemented
|
||||
// NULL_HMAC_SHA1_32 is not supported due to recommendation in rfc3711#5.2 (and lack of NULL)
|
||||
|
||||
pub const RECOMMENDED: &'static [&'static SrtpProtectionProfile] =
|
||||
&[&SrtpProtectionProfile::AES128_CM_HMAC_SHA1_80];
|
||||
pub const SUPPORTED: &'static [&'static SrtpProtectionProfile] =
|
||||
&[&SrtpProtectionProfile::AES128_CM_HMAC_SHA1_80];
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait DtlsBuilder<S> {
|
||||
type Instance: Dtls<S>;
|
||||
|
||||
async fn handshake(self, stream: S) -> Result<Self::Instance, io::Error>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin + 'async_trait;
|
||||
}
|
||||
|
||||
pub trait Dtls<S>: AsyncRead + AsyncWrite + Unpin {
|
||||
fn is_server_side(&self) -> bool;
|
||||
fn export_key(&mut self, exporter_label: &str, length: usize) -> Vec<u8>;
|
||||
}
|
||||
|
||||
pub struct DtlsSrtpMuxer<S> {
|
||||
inner: S,
|
||||
dtls_buf: VecDeque<Vec<u8>>,
|
||||
srtp_buf: VecDeque<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite> DtlsSrtpMuxer<S> {
|
||||
fn new(inner: S) -> Self {
|
||||
DtlsSrtpMuxer {
|
||||
inner,
|
||||
dtls_buf: VecDeque::new(),
|
||||
srtp_buf: VecDeque::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> DtlsSrtpMuxer<S> {
|
||||
fn into_parts(self) -> (DtlsSrtpMuxerPart<S>, DtlsSrtpMuxerPart<S>) {
|
||||
let muxer = Arc::new(Mutex::new(self));
|
||||
let dtls = DtlsSrtpMuxerPart {
|
||||
muxer: muxer.clone(),
|
||||
srtp: false,
|
||||
};
|
||||
let srtp = DtlsSrtpMuxerPart { muxer, srtp: true };
|
||||
(dtls, srtp)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + Unpin> DtlsSrtpMuxer<S> {
|
||||
fn read(
|
||||
&mut self,
|
||||
cx: &mut Context,
|
||||
want_srtp: bool,
|
||||
dst_buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
{
|
||||
let want_buf = if want_srtp {
|
||||
&mut self.srtp_buf
|
||||
} else {
|
||||
&mut self.dtls_buf
|
||||
};
|
||||
if let Some(buf) = want_buf.pop_front() {
|
||||
return Poll::Ready((&buf[..]).read(dst_buf));
|
||||
}
|
||||
}
|
||||
|
||||
let mut buf = [0u8; 2048];
|
||||
let len = ready!(Pin::new(&mut self.inner).poll_read(cx, &mut buf))?;
|
||||
if len == 0 {
|
||||
return Poll::Ready(Ok(0));
|
||||
}
|
||||
let mut buf = &buf[..len];
|
||||
// Demux SRTP and DTLS as per https://tools.ietf.org/html/rfc5764#section-5.1.2
|
||||
let is_srtp = buf[0] >= 128 && buf[0] <= 191;
|
||||
if is_srtp == want_srtp {
|
||||
Poll::Ready(buf.read(dst_buf))
|
||||
} else {
|
||||
if is_srtp {
|
||||
&mut self.srtp_buf
|
||||
} else {
|
||||
&mut self.dtls_buf
|
||||
}
|
||||
.push_back(buf.to_vec());
|
||||
// We have to make sure we're not waiting for, e.g., a srtp packet when
|
||||
// we just got a dtls packet and the remote is waiting on a reply to it.
|
||||
// So, to prevent this kind of deadlock, we abort the current read-path
|
||||
// by pretending that we're doing non-blocking io (even if we aren't)
|
||||
// to get back to where we can enter the other (in the example: the dtls)
|
||||
// read-path and process the packet we just read.
|
||||
Poll::Pending // FIXME this doesn't see sound, shouldn't we store the waker!?
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DtlsSrtpMuxerPart<S> {
|
||||
muxer: Arc<Mutex<DtlsSrtpMuxer<S>>>,
|
||||
srtp: bool,
|
||||
}
|
||||
|
||||
impl<S> AsyncRead for DtlsSrtpMuxerPart<S>
|
||||
where
|
||||
S: AsyncRead + Unpin,
|
||||
{
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.muxer.lock().unwrap().read(cx, self.srtp, buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> AsyncWrite for DtlsSrtpMuxerPart<S>
|
||||
where
|
||||
S: AsyncWrite + Unpin,
|
||||
{
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Pin::new(&mut self.muxer.lock().unwrap().inner).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut self.muxer.lock().unwrap().inner).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
|
||||
ready!(self.as_mut().poll_flush(cx))?;
|
||||
Pin::new(&mut self.muxer.lock().unwrap().inner).poll_close(cx)
|
||||
}
|
||||
}
|
||||
|
||||
pub struct DtlsSrtp<S: AsyncRead + AsyncWrite, D: DtlsBuilder<DtlsSrtpMuxerPart<S>>> {
|
||||
stream: DtlsSrtpMuxerPart<S>,
|
||||
#[allow(dead_code)] // we'll need this once we implement re-keying
|
||||
dtls: D::Instance,
|
||||
srtp_read_context: SrtpContext<Srtp>,
|
||||
srtcp_read_context: SrtpContext<Srtcp>,
|
||||
srtp_write_context: SrtpContext<Srtp>,
|
||||
srtcp_write_context: SrtpContext<Srtcp>,
|
||||
sink_buf: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
impl<S, D> DtlsSrtp<S, D>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
D: DtlsBuilder<DtlsSrtpMuxerPart<S>>,
|
||||
{
|
||||
pub async fn handshake(stream: S, dtls_builder: D) -> Result<DtlsSrtp<S, D>, io::Error> {
|
||||
let (stream_dtls, stream_srtp) = DtlsSrtpMuxer::new(stream).into_parts();
|
||||
let dtls = dtls_builder.handshake(stream_dtls).await?;
|
||||
Ok(DtlsSrtp::new(stream_srtp, dtls))
|
||||
}
|
||||
|
||||
fn new(stream: DtlsSrtpMuxerPart<S>, mut dtls: D::Instance) -> Self {
|
||||
const EXPORTER_LABEL: &str = "EXTRACTOR-dtls_srtp";
|
||||
const KEY_LEN: usize = 16;
|
||||
const SALT_LEN: usize = 14;
|
||||
const EXPORT_LEN: usize = (KEY_LEN + SALT_LEN) * 2;
|
||||
|
||||
let key_material = dtls.export_key(EXPORTER_LABEL, EXPORT_LEN);
|
||||
let client_material = (
|
||||
&(&key_material[0..])[..KEY_LEN],
|
||||
&(&key_material[KEY_LEN * 2..])[..SALT_LEN],
|
||||
);
|
||||
let server_material = (
|
||||
&(&key_material[KEY_LEN..])[..KEY_LEN],
|
||||
&(&key_material[KEY_LEN * 2 + SALT_LEN..])[..SALT_LEN],
|
||||
);
|
||||
let (read_material, write_material) = if dtls.is_server_side() {
|
||||
(client_material, server_material)
|
||||
} else {
|
||||
(server_material, client_material)
|
||||
};
|
||||
let (read_key, read_salt) = read_material;
|
||||
let (write_key, write_salt) = write_material;
|
||||
DtlsSrtp {
|
||||
stream,
|
||||
dtls,
|
||||
srtp_read_context: SrtpContext::new(&read_key, &read_salt),
|
||||
srtcp_read_context: SrtpContext::new(&read_key, &read_salt),
|
||||
srtp_write_context: SrtpContext::new(&write_key, &write_salt),
|
||||
srtcp_write_context: SrtpContext::new(&write_key, &write_salt),
|
||||
sink_buf: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn add_incoming_ssrc(&mut self, ssrc: Ssrc) {
|
||||
self.srtp_read_context.add_ssrc(ssrc);
|
||||
self.srtcp_read_context.add_ssrc(ssrc);
|
||||
}
|
||||
|
||||
pub fn add_incoming_unknown_ssrcs(&mut self, count: usize) {
|
||||
self.srtp_read_context.add_unknown_ssrcs(count);
|
||||
self.srtcp_read_context.add_unknown_ssrcs(count);
|
||||
}
|
||||
|
||||
pub fn add_outgoing_ssrc(&mut self, ssrc: Ssrc) {
|
||||
self.srtp_write_context.add_ssrc(ssrc);
|
||||
self.srtcp_write_context.add_ssrc(ssrc);
|
||||
}
|
||||
|
||||
pub fn add_outgoing_unknown_ssrcs(&mut self, count: usize) {
|
||||
self.srtp_write_context.add_unknown_ssrcs(count);
|
||||
self.srtcp_write_context.add_unknown_ssrcs(count);
|
||||
}
|
||||
|
||||
fn process_incoming_srtp_packet(&mut self, buf: &[u8]) -> Option<Vec<u8>> {
|
||||
// Demux SRTP and SRTCP packets as per https://tools.ietf.org/html/rfc5761#section-4
|
||||
let payload_type = buf[1] & 0x7f;
|
||||
if 64 <= payload_type && payload_type <= 95 {
|
||||
self.srtcp_read_context.process_incoming(buf).ok()
|
||||
} else {
|
||||
self.srtp_read_context.process_incoming(buf).ok()
|
||||
}
|
||||
}
|
||||
|
||||
fn process_outgoing_srtp_packet(&mut self, buf: &[u8]) -> Option<Vec<u8>> {
|
||||
// Demux SRTP and SRTCP packets as per https://tools.ietf.org/html/rfc5761#section-4
|
||||
let payload_type = buf[1] & 0x7f;
|
||||
if 64 <= payload_type && payload_type <= 95 {
|
||||
self.srtcp_write_context.process_outgoing(buf).ok()
|
||||
} else {
|
||||
self.srtp_write_context.process_outgoing(buf).ok()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, D> AsyncRead for DtlsSrtp<S, D>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
D: DtlsBuilder<DtlsSrtpMuxerPart<S>>,
|
||||
{
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
let item = ready!(self.poll_next(cx)?);
|
||||
if let Some(item) = item {
|
||||
Poll::Ready((&item[..]).read(buf))
|
||||
} else {
|
||||
Poll::Ready(Ok(0))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, D> Stream for DtlsSrtp<S, D>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
D: DtlsBuilder<DtlsSrtpMuxerPart<S>>,
|
||||
{
|
||||
type Item = io::Result<Vec<u8>>;
|
||||
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
|
||||
loop {
|
||||
// Check if we have an SRTP packet in the queue
|
||||
if self.stream.muxer.lock().unwrap().srtp_buf.is_empty() {
|
||||
// if we don't, then poll the dtls layer which will read from the
|
||||
// underlying packet stream and produce either dtls data or fill
|
||||
// the SRTP packet queue or fail due to WouldBlock
|
||||
/* FIXME polling dtls eventually errs with a read timeout, for some reason
|
||||
* it does indeed send repeated "Change Cipher Spec" and "Encrypted Handshake
|
||||
* Message" as if its expecting a response to those but none is sent by FF?
|
||||
match self.dtls.read(buf) {
|
||||
Ok(len) => return Ok(len),
|
||||
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
|
||||
// Either we're using non-blocking io and there's no more data
|
||||
// available, or we received an SRTP packet which needs handling
|
||||
}
|
||||
err => return err,
|
||||
};
|
||||
*/
|
||||
}
|
||||
|
||||
// Read and handle the next SRTP packet from the queue
|
||||
let mut raw_buf = [0u8; 2048];
|
||||
let len = ready!(Pin::new(&mut self.stream).poll_read(cx, &mut raw_buf))?;
|
||||
if len == 0 {
|
||||
return Poll::Ready(None);
|
||||
}
|
||||
let raw_buf = &raw_buf[..len];
|
||||
return match self.process_incoming_srtp_packet(raw_buf) {
|
||||
Some(result) => Poll::Ready(Some(Ok(result))),
|
||||
None => {
|
||||
// FIXME: check rfc for whether this should be dropped silently
|
||||
continue; // packet failed to auth or decrypt, drop it and try the next one
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, D> AsyncWrite for DtlsSrtp<S, D>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
D: DtlsBuilder<DtlsSrtpMuxerPart<S>>,
|
||||
{
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
if let Some(buf) = self.process_outgoing_srtp_packet(buf) {
|
||||
Pin::new(&mut self.stream).poll_write(cx, &buf)
|
||||
} else {
|
||||
Poll::Ready(Ok(buf.len()))
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut self.stream).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut self.stream).poll_close(cx)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, D> Sink<&[u8]> for DtlsSrtp<S, D>
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Unpin,
|
||||
D: DtlsBuilder<DtlsSrtpMuxerPart<S>>,
|
||||
{
|
||||
type Error = io::Error;
|
||||
|
||||
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
|
||||
let _ = Sink::poll_flush(self.as_mut(), cx)?;
|
||||
if self.sink_buf.is_none() {
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
|
||||
fn start_send(mut self: Pin<&mut Self>, item: &[u8]) -> io::Result<()> {
|
||||
self.sink_buf = self.process_outgoing_srtp_packet(item.as_ref());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
|
||||
if let Some(buf) = self.sink_buf.take() {
|
||||
match Pin::new(&mut self.stream).poll_write(cx, &buf) {
|
||||
Poll::Pending => {
|
||||
self.sink_buf = Some(buf);
|
||||
return Poll::Pending;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Pin::new(&mut self.stream).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut self.stream).poll_close(cx)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
pub(crate) mod test {
|
||||
use super::*;
|
||||
use crate::rfc3711::test::{
|
||||
TEST_MASTER_KEY, TEST_MASTER_SALT, TEST_SRTCP_PACKET, TEST_SRTCP_SSRC, TEST_SRTP_PACKET,
|
||||
TEST_SRTP_SSRC,
|
||||
};
|
||||
|
||||
use futures::{AsyncReadExt, AsyncWriteExt, FutureExt};
|
||||
|
||||
macro_rules! read_now {
|
||||
( $expr:expr, $buf:expr ) => {
|
||||
$expr
|
||||
.read($buf)
|
||||
.now_or_never()
|
||||
.expect("would block")
|
||||
.expect("reading")
|
||||
};
|
||||
}
|
||||
|
||||
macro_rules! write_now {
|
||||
( $expr:expr, $buf:expr ) => {
|
||||
$expr
|
||||
.write($buf)
|
||||
.now_or_never()
|
||||
.expect("would block")
|
||||
.expect("writing")
|
||||
};
|
||||
}
|
||||
|
||||
struct DummyDtlsBuilder;
|
||||
struct DummyDtls<S> {
|
||||
connected: bool,
|
||||
stream: S,
|
||||
}
|
||||
|
||||
const DUMMY_DTLS_NOOP: &[u8] = &[20, 42];
|
||||
const DUMMY_DTLS_HELLO: &[u8] = &[62, 42];
|
||||
const DUMMY_DTLS_CONNECTED: &[u8] = &[63, 42];
|
||||
|
||||
impl DummyDtlsBuilder {
|
||||
fn new() -> Self {
|
||||
DummyDtlsBuilder {}
|
||||
}
|
||||
}
|
||||
#[async_trait]
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin + Send> DtlsBuilder<S> for DummyDtlsBuilder {
|
||||
type Instance = DummyDtls<S>;
|
||||
|
||||
async fn handshake(self, mut stream: S) -> Result<Self::Instance, io::Error>
|
||||
where
|
||||
S: 'async_trait,
|
||||
{
|
||||
let _ = stream.write(DUMMY_DTLS_HELLO).await;
|
||||
let mut dtls = DummyDtls {
|
||||
stream,
|
||||
connected: false,
|
||||
};
|
||||
loop {
|
||||
let _ = futures::poll!(dtls.read(&mut []));
|
||||
if dtls.connected {
|
||||
break;
|
||||
} else {
|
||||
futures::pending!();
|
||||
}
|
||||
}
|
||||
Ok(dtls)
|
||||
}
|
||||
}
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> Dtls<S> for DummyDtls<S> {
|
||||
fn is_server_side(&self) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
fn export_key(&mut self, exporter_label: &str, length: usize) -> Vec<u8> {
|
||||
assert_eq!(exporter_label, "EXTRACTOR-dtls_srtp");
|
||||
let mut buf = Vec::new();
|
||||
buf.extend(TEST_MASTER_KEY);
|
||||
buf.extend(TEST_MASTER_KEY);
|
||||
let idx = buf.len() - 1;
|
||||
buf[idx] = 0;
|
||||
buf.extend(TEST_MASTER_SALT);
|
||||
buf.extend(TEST_MASTER_SALT);
|
||||
let idx = buf.len() - 1;
|
||||
buf[idx] = 0;
|
||||
assert_eq!(length, buf.len());
|
||||
buf
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncRead for DummyDtls<S> {
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
_dst: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
loop {
|
||||
let mut buf = [0u8; 2048];
|
||||
let len = ready!(Pin::new(&mut self.stream).poll_read(cx, &mut buf))?;
|
||||
assert_eq!(len, 2);
|
||||
assert_eq!(buf[1], 42);
|
||||
match &buf[..len] {
|
||||
DUMMY_DTLS_NOOP => {}
|
||||
DUMMY_DTLS_HELLO => {
|
||||
let _ = Pin::new(&mut self.stream).poll_write(cx, DUMMY_DTLS_CONNECTED)?;
|
||||
}
|
||||
DUMMY_DTLS_CONNECTED => {
|
||||
self.connected = true;
|
||||
}
|
||||
_ => panic!(),
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for DummyDtls<S> {
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
Pin::new(&mut self.stream).poll_write(cx, buf)
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut self.stream).poll_flush(cx)
|
||||
}
|
||||
|
||||
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<io::Result<()>> {
|
||||
Pin::new(&mut self.stream).poll_flush(cx)
|
||||
}
|
||||
}
|
||||
|
||||
type PacketBufArc = Arc<Mutex<VecDeque<Vec<u8>>>>;
|
||||
pub(crate) struct DummyTransport {
|
||||
read_buf: PacketBufArc,
|
||||
write_buf: PacketBufArc,
|
||||
}
|
||||
|
||||
impl DummyTransport {
|
||||
pub fn new() -> (Self, Self) {
|
||||
let read_buf = Arc::new(Mutex::new(VecDeque::new()));
|
||||
let write_buf = Arc::new(Mutex::new(VecDeque::new()));
|
||||
(
|
||||
DummyTransport {
|
||||
read_buf: read_buf.clone(),
|
||||
write_buf: write_buf.clone(),
|
||||
},
|
||||
DummyTransport {
|
||||
read_buf: write_buf.clone(),
|
||||
write_buf: read_buf.clone(),
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
pub fn read_packet(&mut self) -> Option<Vec<u8>> {
|
||||
self.read_buf.lock().unwrap().pop_front()
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncRead for DummyTransport {
|
||||
fn poll_read(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut Context,
|
||||
buf: &mut [u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
match self.read_buf.lock().unwrap().pop_front() {
|
||||
None => Poll::Pending,
|
||||
Some(elem) => Poll::Ready(std::io::Read::read(&mut &elem[..], buf)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl AsyncWrite for DummyTransport {
|
||||
fn poll_write(
|
||||
self: Pin<&mut Self>,
|
||||
_cx: &mut Context,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
self.write_buf.lock().unwrap().push_back(buf.to_vec());
|
||||
Poll::Ready(Ok(buf.len()))
|
||||
}
|
||||
|
||||
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
|
||||
fn poll_close(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<io::Result<()>> {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
fn new_dtls_srtp() -> (DummyTransport, DtlsSrtp<DummyTransport, DummyDtlsBuilder>) {
|
||||
let (mut stream, dummy_stream) = DummyTransport::new();
|
||||
write_now!(stream, DUMMY_DTLS_CONNECTED);
|
||||
let mut dtls_srtp = DtlsSrtp::handshake(dummy_stream, DummyDtlsBuilder::new())
|
||||
.now_or_never()
|
||||
.expect("DTLS-SRTP handshake did not complete")
|
||||
.expect("DTL-SRTP handshake failed");
|
||||
assert_eq!(&stream.read_packet().unwrap()[..], DUMMY_DTLS_HELLO);
|
||||
dtls_srtp.add_incoming_ssrc(TEST_SRTP_SSRC);
|
||||
dtls_srtp.add_incoming_ssrc(TEST_SRTCP_SSRC);
|
||||
dtls_srtp.add_outgoing_ssrc(TEST_SRTP_SSRC);
|
||||
dtls_srtp.add_outgoing_ssrc(TEST_SRTCP_SSRC);
|
||||
(stream, dtls_srtp)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn polls_dtls_layer_for_keys() {
|
||||
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
|
||||
let (mut stream, dummy_stream) = DummyTransport::new();
|
||||
let mut handshake = DtlsSrtp::handshake(dummy_stream, DummyDtlsBuilder::new()).boxed();
|
||||
match handshake.as_mut().poll(&mut cx) {
|
||||
Poll::Pending => {}
|
||||
_ => panic!(),
|
||||
};
|
||||
|
||||
// too early, should be discarded
|
||||
write_now!(stream, TEST_SRTP_PACKET);
|
||||
|
||||
match handshake.as_mut().poll(&mut cx) {
|
||||
Poll::Pending => {}
|
||||
_ => panic!(),
|
||||
};
|
||||
assert_eq!(&stream.read_packet().unwrap()[..], DUMMY_DTLS_HELLO);
|
||||
|
||||
write_now!(stream, DUMMY_DTLS_HELLO);
|
||||
match handshake.as_mut().poll(&mut cx) {
|
||||
Poll::Pending => {}
|
||||
_ => panic!(),
|
||||
};
|
||||
assert_eq!(&stream.read_packet().unwrap()[..], DUMMY_DTLS_CONNECTED);
|
||||
|
||||
write_now!(stream, DUMMY_DTLS_CONNECTED);
|
||||
match handshake.as_mut().poll(&mut cx) {
|
||||
Poll::Ready(_) => {}
|
||||
_ => panic!(),
|
||||
};
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn decryption_of_incoming_srtp_and_srtcp_works() {
|
||||
let mut buf = [0u8; 2048];
|
||||
let (mut stream, mut dtls_srtp) = new_dtls_srtp();
|
||||
|
||||
write_now!(stream, TEST_SRTP_PACKET);
|
||||
write_now!(stream, TEST_SRTCP_PACKET);
|
||||
assert_eq!(read_now!(dtls_srtp, &mut buf), 182); // srtp
|
||||
assert_eq!(read_now!(dtls_srtp, &mut buf), 68); // srtcp
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn does_not_allow_replay_of_packets() {
|
||||
let mut buf = [0u8; 2048];
|
||||
let (mut stream, mut dtls_srtp) = new_dtls_srtp();
|
||||
|
||||
write_now!(stream, TEST_SRTP_PACKET);
|
||||
write_now!(stream, TEST_SRTP_PACKET);
|
||||
write_now!(stream, TEST_SRTP_PACKET);
|
||||
assert_eq!(read_now!(dtls_srtp, &mut buf), 182);
|
||||
assert!(dtls_srtp.read(&mut buf).now_or_never().is_none(),);
|
||||
|
||||
write_now!(stream, TEST_SRTCP_PACKET);
|
||||
write_now!(stream, TEST_SRTCP_PACKET);
|
||||
write_now!(stream, TEST_SRTCP_PACKET);
|
||||
assert_eq!(read_now!(dtls_srtp, &mut buf), 68);
|
||||
assert!(dtls_srtp.read(&mut buf).now_or_never().is_none(),);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,138 @@
|
|||
use async_trait::async_trait;
|
||||
use futures::io::{AsyncRead, AsyncWrite};
|
||||
use std::io;
|
||||
use tokio_openssl::SslStream;
|
||||
use tokio_util::compat::{Compat, FuturesAsyncReadCompatExt, Tokio02AsyncReadCompatExt};
|
||||
|
||||
use openssl::ssl::{ConnectConfiguration, SslAcceptorBuilder};
|
||||
|
||||
use crate::rfc5764::{Dtls, DtlsBuilder, SrtpProtectionProfile};
|
||||
|
||||
type CompatSslStream<S> = Compat<SslStream<Compat<S>>>;
|
||||
|
||||
#[async_trait]
|
||||
impl<S: AsyncRead + AsyncWrite + Send + Unpin> DtlsBuilder<S> for ConnectConfiguration {
|
||||
type Instance = CompatSslStream<S>;
|
||||
|
||||
async fn handshake(mut self, stream: S) -> Result<Self::Instance, io::Error>
|
||||
where
|
||||
S: 'async_trait,
|
||||
{
|
||||
let profiles_str: String = SrtpProtectionProfile::RECOMMENDED
|
||||
.iter()
|
||||
.map(|profile| profile.name.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(":");
|
||||
self.set_tlsext_use_srtp(&profiles_str).unwrap();
|
||||
match tokio_openssl::connect(self, "invalid", stream.compat()).await {
|
||||
Ok(stream) => Ok(stream.compat()),
|
||||
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "handshake error")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<S: AsyncRead + AsyncWrite + Send + Unpin> DtlsBuilder<S> for SslAcceptorBuilder {
|
||||
type Instance = CompatSslStream<S>;
|
||||
|
||||
async fn handshake(mut self, stream: S) -> Result<Self::Instance, io::Error>
|
||||
where
|
||||
S: 'async_trait,
|
||||
{
|
||||
let profiles_str: String = SrtpProtectionProfile::RECOMMENDED
|
||||
.iter()
|
||||
.map(|profile| profile.name.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(":");
|
||||
self.set_tlsext_use_srtp(&profiles_str).unwrap();
|
||||
match tokio_openssl::accept(&self.build(), stream.compat()).await {
|
||||
Ok(stream) => Ok(stream.compat()),
|
||||
Err(_) => Err(io::Error::new(io::ErrorKind::Other, "handshake error")),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<S: AsyncRead + AsyncWrite + Unpin> Dtls<S> for Compat<SslStream<Compat<S>>> {
|
||||
fn is_server_side(&self) -> bool {
|
||||
self.get_ref().ssl().is_server()
|
||||
}
|
||||
|
||||
fn export_key(&mut self, exporter_label: &str, length: usize) -> Vec<u8> {
|
||||
let mut vec = vec![0; length];
|
||||
self.get_mut()
|
||||
.ssl()
|
||||
.export_keying_material(&mut vec, exporter_label, None)
|
||||
.unwrap();
|
||||
vec
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use crate::rfc5764::test::DummyTransport;
|
||||
use crate::rfc5764::DtlsSrtp;
|
||||
|
||||
use futures::FutureExt;
|
||||
use openssl::asn1::Asn1Time;
|
||||
use openssl::hash::MessageDigest;
|
||||
use openssl::pkey::PKey;
|
||||
use openssl::rsa::Rsa;
|
||||
use openssl::ssl::{SslAcceptor, SslConnector, SslMethod, SslVerifyMode};
|
||||
use openssl::x509::X509;
|
||||
use std::task::{Context, Poll};
|
||||
|
||||
#[test]
|
||||
fn connect_and_establish_matching_key_material() {
|
||||
let (client_sock, server_sock) = DummyTransport::new();
|
||||
|
||||
let rsa = Rsa::generate(2048).unwrap();
|
||||
let key = PKey::from_rsa(rsa).unwrap();
|
||||
|
||||
let mut cert_builder = X509::builder().unwrap();
|
||||
cert_builder
|
||||
.set_not_after(&Asn1Time::days_from_now(1).unwrap())
|
||||
.unwrap();
|
||||
cert_builder
|
||||
.set_not_before(&Asn1Time::days_from_now(0).unwrap())
|
||||
.unwrap();
|
||||
cert_builder.set_pubkey(&key).unwrap();
|
||||
cert_builder.sign(&key, MessageDigest::sha256()).unwrap();
|
||||
let cert = cert_builder.build();
|
||||
|
||||
let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::dtls()).unwrap();
|
||||
let mut connector = SslConnector::builder(SslMethod::dtls()).unwrap();
|
||||
acceptor.set_certificate(&cert).unwrap();
|
||||
acceptor.set_private_key(&key).unwrap();
|
||||
acceptor.set_verify(SslVerifyMode::NONE);
|
||||
connector.set_verify(SslVerifyMode::NONE);
|
||||
let handshake_server = DtlsSrtp::handshake(server_sock, acceptor);
|
||||
let handshake_client =
|
||||
DtlsSrtp::handshake(client_sock, connector.build().configure().unwrap());
|
||||
let mut future = futures::future::join(handshake_server, handshake_client).boxed();
|
||||
|
||||
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
|
||||
loop {
|
||||
if let Poll::Ready((server, client)) = future.as_mut().poll(&mut cx) {
|
||||
let server = server.unwrap();
|
||||
let client = client.unwrap();
|
||||
assert_eq!(
|
||||
client.srtp_read_context.master_key,
|
||||
server.srtp_write_context.master_key
|
||||
);
|
||||
assert_eq!(
|
||||
client.srtp_read_context.master_salt,
|
||||
server.srtp_write_context.master_salt
|
||||
);
|
||||
assert_eq!(
|
||||
client.srtp_write_context.master_key,
|
||||
server.srtp_read_context.master_key
|
||||
);
|
||||
assert_eq!(
|
||||
client.srtp_write_context.master_salt,
|
||||
server.srtp_read_context.master_salt
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,6 @@
|
|||
use std::io::{Read, Write};
|
||||
|
||||
use Result;
|
||||
use crate::Result;
|
||||
|
||||
pub trait Packet {}
|
||||
|
||||
|
|
Loading…
Reference in New Issue