Software developers often need to create tools to test their applications during development. While the software I develop typically runs on Linux or QNX, I have also built tools using Visual Studio on Windows for interfaces like RS232/RS422/RS485 and Ethernet multicast/unicast.
Recently, I needed to test a unicast
application. We had original communication data captured by tcpdump (stored as a .pcap file viewable in
Wireshark). Instead of manually extracting raw data from the .pcap file to feed into my test tool, I wondered: Could
I directly read the Wireshark data file and transmit packets based on their
original timestamps?
The answer is yes—using Python! In this
article, I’ll share a tool that enables testing applications with live network
data captured from running systems.
What the Tool Can Do
·
Read .pcap files:
Parse packet capture files generated by tools like Wireshark or tcpdump.
·
Filter
specified packets: Selectively transmit
packets based on destination port.
·
Respect
timestamps: Ensure packet
transmission adheres to original timing, with a maximum interval (5 seconds
between packets).
·
Customize
source/destination IPs:
Modify IP headers to reroute packets as needed.
·
Log
activity: Save transmitted
packets and basic statistics (e.g., packet count, errors) to a text file.
·
Extract
packets: Convert .pcap data into a readable text format for analysis.
Challenges Encountered
1. Truncated .pcap files:
Default tcpdump settings vary across operating systems,
often truncating packet lengths. To capture full packets, use -s 0:
tcpdump
-s 0 -i fec0 -w radio.pcap "port 54037"
2. Packets not received by the target:
o MAC address issues: Unspecified or incorrect MAC addresses can
prevent packets from reaching the target device.
o OS packet filtering: Some operating systems may drop packets with
mismatched headers (e.g., unexpected source IPs).
How to Use the Tool
1.
Install
Python 3.10+ on Windows if
not already installed.
2. Install Scapy:
pip
install scapy
3.
Configure
the script: Open pcapsender_udp.py and update the IFACE_NAME variable to match your system’s network adapter.
4. Run the tool:
py
pcapsender_udp.py --pcap radio.pcap --src-ip 192.168.101.188 --dst-ip 192.168.101.1
--dst-port 54037
Python Source Code pcapsender_udp.py
import argparse
from scapy.all import *
import time
from datetime import datetime
# --- Parse command-line arguments ---
parser = argparse.ArgumentParser(description="PCAP UDP replayer with dynamic MAC resolution.")
parser.add_argument("--pcap", required=True, help="Path to the PCAP
file to replay")
parser.add_argument("--src-ip", required=True, help="Custom source IP
address")
parser.add_argument("--dst-ip", required=True, help="Custom destination
IP address")
parser.add_argument("--dst-port", type=int, help="Filter packets by
destination port (optional)")
parser.add_argument("--info", action="store_true", help="Show extra debug
info")
args = parser.parse_args()
PCAP_FILE = args.pcap
CUSTOM_SRC_IP = args.src_ip
CUSTOM_DST_IP = args.dst_ip
DEST_PORT = args.dst_port
# Set your network interface name here
IFACE_NAME = "Intel(R) Ethernet Connection (10) I219-V" # Replace with your NIC
name on Windows or eth0/en0/etc on Linux/Mac
# --- Resolve MAC address of target IP ---
def resolve_mac(ip, iface):
# Send ARP request to resolve MAC address for the given IP
arp_request = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip)
response = srp1(arp_request, iface=iface, timeout=2, verbose=False)
if response:
return response[Ether].src
return None
dst_mac = resolve_mac(CUSTOM_DST_IP, IFACE_NAME)
if not dst_mac:
print(f"Could not resolve MAC for {CUSTOM_DST_IP}. Target might be
offline.")
exit(1)
# --- Load pcap ---
packets = rdpcap(PCAP_FILE)
# Filter packets if DEST_PORT is provided
if DEST_PORT:
filtered_packets = [pkt for pkt in packets if pkt.haslayer(UDP) and pkt[UDP].dport == DEST_PORT]
if not filtered_packets:
print(f"No packets found
with destination port {DEST_PORT}.")
exit(1)
packets = filtered_packets
# Store timestamps and packets for replay
timestamps = [(pkt.time, pkt) for pkt in packets]
script_start_time = time.time()
log_file = open("send.log", "w")
# --- Log stream info ---
first_pkt = timestamps[0][1]
if first_pkt.haslayer(IP):
src_ip = first_pkt[IP].src
dst_ip = first_pkt[IP].dst
else:
src_ip = dst_ip = "Unknown"
port_info = ""
if first_pkt.haslayer(UDP):
port_info = f"UDP {first_pkt[UDP].sport} -> {first_pkt[UDP].dport}"
elif first_pkt.haslayer(TCP):
port_info = f"TCP {first_pkt[TCP].sport} -> {first_pkt[TCP].dport}"
print(f"\n==> Stream Info: from {src_ip} to {dst_ip}, {port_info}, sending via interface '{IFACE_NAME}' to MAC {dst_mac}\n")
log_file.write(f"Stream Info: from {src_ip} to {dst_ip}, {port_info}, to MAC {dst_mac}\n")
# --- Replay packets ---
prev_send_time = None
last_frame_sent = None
for i, (timestamp, pkt) in enumerate(timestamps):
# Create a copy to avoid modifying the original packet
modified_pkt = pkt.copy()
# Strip existing Ethernet layer if present
if modified_pkt.haslayer(Ether):
modified_pkt = modified_pkt[Ether].payload
# Modify IP addresses
if modified_pkt.haslayer(IP):
modified_pkt[IP].src = CUSTOM_SRC_IP
modified_pkt[IP].dst = CUSTOM_DST_IP
del modified_pkt[IP].chksum # Remove checksum so Scapy
recalculates it
# Recalculate transport layer checksums
if modified_pkt.haslayer(UDP):
del modified_pkt[UDP].chksum
elif modified_pkt.haslayer(TCP):
del modified_pkt[TCP].chksum
# Add new Ethernet layer with resolved MAC
ether = Ether(dst=dst_mac)
full_pkt = ether / modified_pkt
# Handle timing: sleep to match original packet timing (max 5
seconds)
if i > 0:
prev_timestamp = timestamps[i - 1][0]
time_diff = timestamp - prev_timestamp
sleep_time = min(max(time_diff, 0), 5.0)
time.sleep(float(sleep_time))
actual_send_time = time.time()
time_diff_ms = int((actual_send_time - prev_send_time) * 1000) if prev_send_time else 0
prev_send_time = actual_send_time
last_frame_sent = i + 1
# Log payload in hex if present
if pkt.haslayer(Raw):
payload = pkt[Raw].load
payload_hex = ' '.join(f"{b:02x}" for b in payload)
else:
payload_hex = "No payload"
# Send the packet on the specified interface
sendp(full_pkt, iface=IFACE_NAME, verbose=False)
# Log send time and payload
dt = datetime.fromtimestamp(actual_send_time)
milliseconds = dt.microsecond // 1000
log_line = (f"Sent at: {dt.strftime('%Y-%m-%d %H:%M:%S')}.{milliseconds:03d}ms, "
f"{last_frame_sent}, {time_diff_ms}ms, Payload: [{payload_hex}]")
print(log_line)
log_file.write(log_line + "\n")
# --- Summary ---
total_runtime = time.time() - script_start_time
h, m, s = int(total_runtime // 3600), int((total_runtime % 3600) // 60), int(total_runtime % 60)
print(f"\nTotal script run time: {h}h{m}m{s}s")
log_file.write(f"\nTotal run time: {h}h{m}m{s}s\n")
log_file.close()
Key Takeaways
This tool simplifies testing by replaying real-world network traffic with
precise timing (maximum 5 seconds interval) and customizable headers. By
leveraging Scapy’s .pcap parsing and packet manipulation
capabilities, developers can streamline validation workflows for unicast
applications.
Feel free to copy, modify, and adapt this tool
to your specific needs – whether you're simulating network load, debugging
protocol implementations, or replaying production traffic in staging
environments. Customizing this solution will not only boost your productivity
but also significantly accelerate your testing cycles, helping you validate
systems faster while maintaining the timing integrity and behavioral patterns
of real-world network interactions. Remember, the best test tools are those
that evolve with your unique requirements!