Saturday, May 17, 2025

Python Program to Send Wireshark Data Frames

Software developers often need to create tools to test their applications during development. While the software I develop typically runs on Linux or QNX, I have also built tools using Visual Studio on Windows for interfaces like RS232/RS422/RS485 and Ethernet multicast/unicast.

Recently, I needed to test a unicast application. We had original communication data captured by tcpdump (stored as a .pcap file viewable in Wireshark). Instead of manually extracting raw data from the .pcap file to feed into my test tool, I wondered: Could I directly read the Wireshark data file and transmit packets based on their original timestamps?

The answer is yes—using Python! In this article, I’ll share a tool that enables testing applications with live network data captured from running systems.

What the Tool Can Do

·        Read .pcap files: Parse packet capture files generated by tools like Wireshark or tcpdump.

·        Filter specified packets: Selectively transmit packets based on destination port.

·        Respect timestamps: Ensure packet transmission adheres to original timing, with a maximum interval (5 seconds between packets).

·        Customize source/destination IPs: Modify IP headers to reroute packets as needed.

·        Log activity: Save transmitted packets and basic statistics (e.g., packet count, errors) to a text file.

·        Extract packets: Convert .pcap data into a readable text format for analysis.

Challenges Encountered

1.     Truncated .pcap files:
Default 
tcpdump settings vary across operating systems, often truncating packet lengths. To capture full packets, use -s 0:

tcpdump -s 0 -i fec0 -w radio.pcap "port 54037"

2.     Packets not received by the target:

o   MAC address issues: Unspecified or incorrect MAC addresses can prevent packets from reaching the target device.

o   OS packet filtering: Some operating systems may drop packets with mismatched headers (e.g., unexpected source IPs).

How to Use the Tool

1.     Install Python 3.10+ on Windows if not already installed.

2.     Install Scapy:

pip install scapy

3.     Configure the script: Open pcapsender_udp.py and update the IFACE_NAME variable to match your system’s network adapter.

4.     Run the tool:

py pcapsender_udp.py --pcap radio.pcap --src-ip 192.168.101.188 --dst-ip 192.168.101.1 --dst-port 54037


Python Source Code pcapsender_udp.py 

import argparse

from scapy.all import *

import time

from datetime import datetime

 

# --- Parse command-line arguments ---

parser = argparse.ArgumentParser(description="PCAP UDP replayer with dynamic MAC resolution.")

parser.add_argument("--pcap", required=True, help="Path to the PCAP file to replay")

parser.add_argument("--src-ip", required=True, help="Custom source IP address")

parser.add_argument("--dst-ip", required=True, help="Custom destination IP address")

parser.add_argument("--dst-port", type=int, help="Filter packets by destination port (optional)")

parser.add_argument("--info", action="store_true", help="Show extra debug info")

args = parser.parse_args()

 

PCAP_FILE = args.pcap

CUSTOM_SRC_IP = args.src_ip

CUSTOM_DST_IP = args.dst_ip

DEST_PORT = args.dst_port

# Set your network interface name here

IFACE_NAME = "Intel(R) Ethernet Connection (10) I219-V"  # Replace with your NIC name on Windows or eth0/en0/etc on Linux/Mac

 

# --- Resolve MAC address of target IP ---

def resolve_mac(ip, iface):

    # Send ARP request to resolve MAC address for the given IP

    arp_request = Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=ip)

    response = srp1(arp_request, iface=iface, timeout=2, verbose=False)

    if response:

        return response[Ether].src

    return None

 

dst_mac = resolve_mac(CUSTOM_DST_IP, IFACE_NAME)

if not dst_mac:

    print(f"Could not resolve MAC for {CUSTOM_DST_IP}. Target might be offline.")

    exit(1)

 

# --- Load pcap ---

packets = rdpcap(PCAP_FILE)

 

# Filter packets if DEST_PORT is provided

if DEST_PORT:

    filtered_packets = [pkt for pkt in packets if pkt.haslayer(UDP) and pkt[UDP].dport == DEST_PORT]

    if not filtered_packets:

        print(f"No packets found with destination port {DEST_PORT}.")

        exit(1)

    packets = filtered_packets

 

# Store timestamps and packets for replay

timestamps = [(pkt.time, pkt) for pkt in packets]

script_start_time = time.time()

log_file = open("send.log", "w")

 

# --- Log stream info ---

first_pkt = timestamps[0][1]

if first_pkt.haslayer(IP):

    src_ip = first_pkt[IP].src

    dst_ip = first_pkt[IP].dst

else:

    src_ip = dst_ip = "Unknown"

 

port_info = ""

if first_pkt.haslayer(UDP):

    port_info = f"UDP {first_pkt[UDP].sport} -> {first_pkt[UDP].dport}"

elif first_pkt.haslayer(TCP):

    port_info = f"TCP {first_pkt[TCP].sport} -> {first_pkt[TCP].dport}"

 

print(f"\n==> Stream Info: from {src_ip} to {dst_ip}, {port_info}, sending via interface '{IFACE_NAME}' to MAC {dst_mac}\n")

log_file.write(f"Stream Info: from {src_ip} to {dst_ip}, {port_info}, to MAC {dst_mac}\n")

 

# --- Replay packets ---

prev_send_time = None

last_frame_sent = None

 

for i, (timestamp, pkt) in enumerate(timestamps):

    # Create a copy to avoid modifying the original packet

    modified_pkt = pkt.copy()

   

    # Strip existing Ethernet layer if present

    if modified_pkt.haslayer(Ether):

        modified_pkt = modified_pkt[Ether].payload

   

    # Modify IP addresses

    if modified_pkt.haslayer(IP):

        modified_pkt[IP].src = CUSTOM_SRC_IP

        modified_pkt[IP].dst = CUSTOM_DST_IP

        del modified_pkt[IP].chksum  # Remove checksum so Scapy recalculates it

        # Recalculate transport layer checksums

        if modified_pkt.haslayer(UDP):

            del modified_pkt[UDP].chksum

        elif modified_pkt.haslayer(TCP):

            del modified_pkt[TCP].chksum

   

    # Add new Ethernet layer with resolved MAC

    ether = Ether(dst=dst_mac)

    full_pkt = ether / modified_pkt

 

    # Handle timing: sleep to match original packet timing (max 5 seconds)

    if i > 0:

        prev_timestamp = timestamps[i - 1][0]

        time_diff = timestamp - prev_timestamp

        sleep_time = min(max(time_diff, 0), 5.0)

        time.sleep(float(sleep_time))

 

    actual_send_time = time.time()

    time_diff_ms = int((actual_send_time - prev_send_time) * 1000) if prev_send_time else 0

    prev_send_time = actual_send_time

    last_frame_sent = i + 1

 

    # Log payload in hex if present

    if pkt.haslayer(Raw):

        payload = pkt[Raw].load

        payload_hex = ' '.join(f"{b:02x}" for b in payload)

    else:

        payload_hex = "No payload"

 

    # Send the packet on the specified interface

    sendp(full_pkt, iface=IFACE_NAME, verbose=False)

 

    # Log send time and payload

    dt = datetime.fromtimestamp(actual_send_time)

    milliseconds = dt.microsecond // 1000

    log_line = (f"Sent at: {dt.strftime('%Y-%m-%d %H:%M:%S')}.{milliseconds:03d}ms, "

                f"{last_frame_sent}, {time_diff_ms}ms, Payload: [{payload_hex}]")

    print(log_line)

    log_file.write(log_line + "\n")

 

# --- Summary ---

total_runtime = time.time() - script_start_time

h, m, s = int(total_runtime // 3600), int((total_runtime % 3600) // 60), int(total_runtime % 60)

print(f"\nTotal script run time: {h}h{m}m{s}s")

log_file.write(f"\nTotal run time: {h}h{m}m{s}s\n")

log_file.close()

 


Key Takeaways
This tool simplifies testing by replaying real-world network traffic with precise timing (maximum 5 seconds interval) and customizable headers. By leveraging Scapy’s 
.pcap parsing and packet manipulation capabilities, developers can streamline validation workflows for unicast applications.

Feel free to copy, modify, and adapt this tool to your specific needs – whether you're simulating network load, debugging protocol implementations, or replaying production traffic in staging environments. Customizing this solution will not only boost your productivity but also significantly accelerate your testing cycles, helping you validate systems faster while maintaining the timing integrity and behavioral patterns of real-world network interactions. Remember, the best test tools are those that evolve with your unique requirements!

Building on Event Handlers: Implementing Delayed Triggers in C++

Introduction Earlier, I explained  four foundational event handler patterns  in C++—from basic callbacks to observer systems ( https://a5w...