From bce68a1e8c999aeb663c675b4b5aa671a4aa877d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Slab=C3=BD?= Date: Sat, 6 Jan 2024 14:33:13 +0100 Subject: [PATCH] Initial code commit --- .gitignore | 1 + Cargo.lock | 25 +++++++++++ Cargo.toml | 9 ++++ LICENSE | 21 +++++++++ src/main.rs | 124 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 180 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 LICENSE create mode 100644 src/main.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..096ac32 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,25 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "canihazudp" +version = "0.1.0" +dependencies = [ + "getopts", +] + +[[package]] +name = "getopts" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "14dbbfd5c71d70241ecf9e6f13737f7b5ce823821063188d7e46c41d371eebd5" +dependencies = [ + "unicode-width", +] + +[[package]] +name = "unicode-width" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ed742d4ea2bd1176e236172c8429aaf54486e7ac098db29ffe6529e0ce50973" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..82ec058 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "canihazudp" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +getopts = "0.2.21" \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..8c15c5f --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 IIM + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..492205d --- /dev/null +++ b/src/main.rs @@ -0,0 +1,124 @@ +extern crate getopts; + +use std::{net::{UdpSocket, SocketAddr, IpAddr}, collections::HashMap, io::{self, Write}}; +use getopts::Options; +use std::env; + +const MESSAGE_TOKENS: u32 = 10_000; +const REPORT_AFTER: usize = 60 * 10; + +fn print_usage(program: &str, opts: Options) +{ + let brief = format!("Usage: {} [options] -s -p -t :", program); + print!("{}", opts.usage(&brief)); +} + +fn start_forwarding(listen_addr: IpAddr, port: u16, source_address: SocketAddr, target_addr: SocketAddr, buffer_size: usize, subscribe_keyword: Option<&str>, source_keyword: Option<&str>) { + let socket = match UdpSocket::bind(SocketAddr::new(listen_addr, port)) { + Ok(sock) => sock, + Err(e) => panic!("{}", e) + }; + + let mut buffer = vec![0; buffer_size]; + let mut sub_map: HashMap = HashMap::new(); + let mut report_counter = 0; + let mut report_size = 0; + let mut keyword_resend = 0; + + println!("Listening on port {}, forwarding to {}...", port, target_addr); + if let Some(kw) = source_keyword { + println!("Source keyword: {}", kw); + } + if let Some(kw) = subscribe_keyword { + println!("Multicast subscribe keyword: {}", kw); + } + loop { + buffer.fill(0); + + if let Some(kw) = source_keyword { + if keyword_resend <= 0 { + keyword_resend = MESSAGE_TOKENS / 2; + socket.send_to(kw.as_bytes(), source_address).unwrap(); + } else { + keyword_resend -= 1; + } + } + + let (recv_size, src_addr) = socket.recv_from(buffer.as_mut_slice()).unwrap(); + report_size += recv_size; + + if src_addr.ip() == source_address.ip() { + socket.send_to(&buffer[..recv_size], target_addr).unwrap(); + + sub_map.retain(|addr, tokens| { + if let Err(e) = socket.send_to(&buffer, addr) { + eprintln!("Failed to send data to a subscriber! Error: {:?}", e); + return false; + } + *tokens -= 1; + + return *tokens > 0u32; + }); + + print!("."); + io::stdout().flush().unwrap(); + report_counter += 1; + if report_counter % 60 == 0 { + println!(""); + } + if report_counter >= REPORT_AFTER { + println!("Forwarded: {} bytes", report_size); + report_size = 0; + report_counter = 0; + } + } else if let Some(kw) = subscribe_keyword { + let data = String::from_utf8(buffer[..recv_size].to_vec()); + match data { + Ok(s) if s == kw => { + println!("New multicast client subscribed: {}", src_addr); + + sub_map.insert(src_addr, MESSAGE_TOKENS); + } + Ok(s) => println!("Received unrecognized keyword \"{}\" from unexpected source {}.", s, src_addr), + Err(_) => println!("Could not parse keyword from {}", src_addr) + } + } + } +} + +fn main() { + let mut options = Options::new(); + options.optflag("h", "help", "Show this help message."); + options.optopt("s", "source", "Source address to listen for packets from. (required, port only used for source keyword)", "X.X.X.X:PORT"); + options.optopt("p", "port", "Port on which to listen for incoming packets. (required)", "PORT"); + options.optopt("t", "target", "Address to which received packets will be forwarded. (required)", "X.X.X.X:PORT"); + options.optopt("l", "listen_address", "Limit the address on which to listen for packets (default is any)", "X.X.X.X"); + options.optopt("b", "buffer", "Specify the size of the receive buffer in bytes.", "BYTES"); + options.optopt("k", "keyword", "Keyword for multicast subscription.", "KEYWORD"); + options.optopt("w", "source_keyword", "Keyword to send to the source address (used to chain this program).", "KEYWORD"); + + let args: Vec = env::args().collect(); + let program = args[0].clone(); + + let matches = options.parse(args).expect("Error parsing arguments."); + + if matches.opt_present("h") { + print_usage(&program, options); + return; + } + if ! matches.opt_present("s") || ! matches.opt_present("p") || ! matches.opt_present("t") { + println!("ERROR: Missing required options. See help."); + print_usage(&program, options); + return; + } + + let source_addr = matches.opt_str("s").unwrap().parse().expect("Error parsing listen address."); + let target_addr = matches.opt_str("t").unwrap().parse().expect("Error parsing send address."); + let port = matches.opt_get("p").expect("Error parsing port.").expect("Error parsing port."); + let buffer_size = matches.opt_get_default("b", 1024).expect("Error parsing buffer size."); + let keyword: Option = matches.opt_get("k").expect("Error parsing multicast keyword."); + let source_keyword: Option = matches.opt_get("w").expect("Error parsing source keyword."); + let listen_addr = matches.opt_str("l").unwrap_or(String::from("0.0.0.0")).parse().expect("Error parsing listen address."); + + start_forwarding(listen_addr, port, source_addr, target_addr, buffer_size, keyword.as_deref(), source_keyword.as_deref()); +}