语音隐写术之LSB

最近工作需要实现一个音频水印功能。不得不说大模型确实提高了效率,使用大模型提供了完整的代码框架,代码质量很高只有几处错误,修复后就可以直接使用。相关代码如下,需求的自取。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import numpy as np
import wave

def encode_watermark(watermark_data, bits_per_sample):
    """
    将水印数据编码为二进制位数组,每个样本使用bits_per_sample位。
    """
    # 假设watermark_data是一个字符串,我们首先将其转换为字节数组
    # 然后,将字节数组转换为二进制位数组
    watermark_bits = []
    for byte in watermark_data.encode('utf-8'):
        for _ in range(8):
            watermark_bits.append(byte & 1)
            byte >>= 1
    
    # 截断或填充以匹配所需的位数(如果需要)
    # 这里我们简化处理,假设watermark_bits长度合适
    return watermark_bits

def decode_watermark(watermark_bits, bits_per_sample):
    """
    将二进制位数组解码为水印数据字符串,假设每个样本使用bits_per_sample位(但在此函数中我们忽略这个参数,因为解码时不需要知道这个信息)。
    """
    # 忽略bits_per_sample参数,因为我们假设watermark_bits已经具有正确的长度
    # 初始化一个空的字节列表
    bytes_list = []
    for i in range(0, len(watermark_bits), 8):
        byte_value = 0
        for j in range(8):
            if i + j < len(watermark_bits):
                byte_value |= watermark_bits[i + j] << (j)
        bytes_list.append(byte_value)
    
    # 将字节列表解码为字符串
    return bytes(bytes_list).decode('utf-8')

def embed_watermark(audio_file, watermark_data, output_file, bits_per_sample=1):
    """
    将水印嵌入到音频文件中。
    
    :param audio_file: 输入音频文件名
    :param watermark_data: 要嵌入的水印数据(字符串)
    :param output_file: 输出音频文件名
    :param bits_per_sample: 每个音频样本中用于水印的位数(默认为1)
    """
    # 读取音频文件
    with wave.open(audio_file, 'rb') as wav_file:
        n_channels, sample_width, framerate, nframes = wav_file.getparams()[:4]
        
        # 假设音频是单声道的
        if n_channels != 1:
            raise ValueError("Audio must be mono channel.")
        
        # 读取音频数据
        audio_data = np.frombuffer(wav_file.readframes(nframes), dtype=np.int16)
        
        # 编码水印数据
        watermark_bits = encode_watermark(watermark_data, bits_per_sample * len(audio_data))
        
        # 如果水印位数超过音频样本数,则截断水印
        if len(watermark_bits) > len(audio_data) * bits_per_sample:
            watermark_bits = watermark_bits[:len(audio_data) * bits_per_sample]
        audio_data_copy = audio_data.copy()
        # 嵌入水印
        for i, sample in enumerate(audio_data):
            # 获取当前样本的最低有效位(或几位)
            lsb_mask = (1 << bits_per_sample) - 1
            original_lsb = sample & lsb_mask
            
            # 计算要嵌入的水印位
            if(i * bits_per_sample >= len(watermark_bits)):
                watermark_bit = 0
            else:
                watermark_bit = watermark_bits[i * bits_per_sample]
            watermark_bits_to_embed = (original_lsb & ~lsb_mask) | (watermark_bit << (bits_per_sample - 1))
            
            # 更新样本的最低有效位
            audio_data_copy[i] = (sample & ~lsb_mask) | watermark_bits_to_embed
        
        # 将处理后的音频数据保存为文件
        with wave.open(output_file, 'wb') as output_wav:
            output_wav.setnchannels(n_channels)
            output_wav.setsampwidth(sample_width)
            output_wav.setframerate(framerate)
            output_wav.writeframes(audio_data_copy.astype(np.int16).tobytes())

def extract_watermark(audio_file, bits_per_sample=1):
    """
    从音频文件中提取水印。
    
    :param audio_file: 包含水印的音频文件名
    :param bits_per_sample: 每个音频样本中用于水印的位数(默认为1)
    :return: 提取的水印数据(字符串)
    """
    # 读取音频文件
    with wave.open(audio_file, 'rb') as wav_file:
        n_channels, sample_width, framerate, nframes = wav_file.getparams()[:4]
        
        # 假设音频是单声道的
        if n_channels != 1:
            raise ValueError("Audio must be mono channel.")
        
        # 读取音频数据
        audio_data = np.frombuffer(wav_file.readframes(nframes), dtype=np.int16)
        
        # 初始化提取的水印位数组
        watermark_bits = []
        
        # 遍历音频样本以提取水印位
        for sample in audio_data:
            # 获取当前样本的最低有效位(或几位)
            lsb_mask = (1 << bits_per_sample) - 1
            watermark_bit = (sample & lsb_mask) >> (bits_per_sample - 1)  # 提取最低位
            watermark_bits.append(watermark_bit)
        
        watermark_data = decode_watermark(watermark_bits, bits_per_sample)
        
        return watermark_data

# 使用示例
watermark_data = "shuiyin"  # 要嵌入的水印数据
audio_file = "100024_10.wav"  # 输入音频文件名
output_file = "watermarked.wav"  # 输出音频文件名
embed_watermark(audio_file, watermark_data, output_file, bits_per_sample=1)
extracted_watermark = extract_watermark(output_file, bits_per_sample=1)
print("Extracted watermark:", extracted_watermark)