shithub: opus

ref: f74bff74f6c3961eb4e3b73cd38a4aff71aff5f4
dir: /dnn/training_tf2/fec_packets.py/

View raw version
import numpy as np



def write_fec_packets(filename, packets, rates=None):
    """ writes packets in binary format """
    
    assert np.dtype(np.float32).itemsize == 4
    assert np.dtype(np.int16).itemsize == 2
    
    # derive some sizes 
    num_packets             = len(packets)
    subframes_per_packet    = packets[0].shape[-2]
    num_features            = packets[0].shape[-1]
    
    # size of float is 4
    subframe_size           = num_features * 4
    packet_size             = subframe_size * subframes_per_packet + 2 # two bytes for rate
    
    version = 1
    # header size (version, header_size, num_packets, packet_size, subframe_size, subrames_per_packet, num_features)
    header_size = 14
    
    with open(filename, 'wb') as f:
        
        # header
        f.write(np.int16(version).tobytes())
        f.write(np.int16(header_size).tobytes())
        f.write(np.int16(num_packets).tobytes())
        f.write(np.int16(packet_size).tobytes())
        f.write(np.int16(subframe_size).tobytes())
        f.write(np.int16(subframes_per_packet).tobytes())
        f.write(np.int16(num_features).tobytes())
        
        # packets
        for i, packet in enumerate(packets):
            if type(rates) == type(None):
                rate = 0
            else:
                rate = rates[i]
            
            f.write(np.int16(rate).tobytes())
            
            features = np.flip(packet, axis=-2)
            f.write(features.astype(np.float32).tobytes())
            
        
def read_fec_packets(filename):
    """ reads packets from binary format """
    
    assert np.dtype(np.float32).itemsize == 4
    assert np.dtype(np.int16).itemsize == 2
    
    with open(filename, 'rb') as f:
        
        # header
        version                 = np.frombuffer(f.read(2), dtype=np.int16).item()
        header_size             = np.frombuffer(f.read(2), dtype=np.int16).item()
        num_packets             = np.frombuffer(f.read(2), dtype=np.int16).item()
        packet_size             = np.frombuffer(f.read(2), dtype=np.int16).item()
        subframe_size           = np.frombuffer(f.read(2), dtype=np.int16).item()
        subframes_per_packet    = np.frombuffer(f.read(2), dtype=np.int16).item()
        num_features            = np.frombuffer(f.read(2), dtype=np.int16).item()
        
        dummy_features          = np.zeros((1, subframes_per_packet, num_features), dtype=np.float32)
        
        # packets
        rates = []
        packets = []
        for i in range(num_packets):
                     
            rate = np.frombuffer(f.read(2), dtype=np.int16).item
            rates.append(rate)
            
            features = np.reshape(np.frombuffer(f.read(subframe_size * subframes_per_packet), dtype=np.float32), dummy_features.shape)
            packet = np.flip(features, axis=-2)
            packets.append(packet)
            
    return packets