当前位置:  开发笔记 > 编程语言 > 正文

用Python实现实时信号处理 - 如何连续捕获音频?

如何解决《用Python实现实时信号处理-如何连续捕获音频?》经验,为你挑选了0个好方法。

我打算用Python实现一个"类DSP"信号处理器.它应该通过ALSA捕获音频的小片段,处理它们,然后通过ALSA播放它们.

为了开始,我编写了以下(非常简单的)代码.

import alsaaudio

inp = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NORMAL)
inp.setchannels(1)
inp.setrate(96000)
inp.setformat(alsaaudio.PCM_FORMAT_U32_LE)
inp.setperiodsize(1920)

outp = alsaaudio.PCM(alsaaudio.PCM_PLAYBACK, alsaaudio.PCM_NORMAL)
outp.setchannels(1)
outp.setrate(96000)
outp.setformat(alsaaudio.PCM_FORMAT_U32_LE)
outp.setperiodsize(1920)

while True:
    l, data = inp.read()
    # TODO: Perform some processing.
    outp.write(data)

问题是,音频"口吃"并且不是无间隙的.我尝试使用PCM模式,将其设置为PCM_ASYNC或PCM_NONBLOCK,但问题仍然存在.我认为问题是两个后续调用"inp.read()"之间的样本"丢失".

有没有办法在Python中"连续"捕获音频(最好不需要太"特定"/"非标准"的库)?我希望信号总是"在后台"被捕获到一些缓冲区中,我可以从中读取一些"瞬间状态",同时即使在我执行读取操作的时间内,音频也会被捕获到缓冲区中.我怎样才能做到这一点?

即使我使用专用的进程/线程来捕获音频,这个进程/线程总是至少必须(1)从源读取音频,(2)然后将其放入某个缓冲区(从中"信号处理")进程/线程然后读取).因此,这两个操作仍将按时间顺序进行,因此样本将丢失.我该如何避免这种情况?

非常感谢您的建议!

编辑2:现在我开始运行了.

import alsaaudio
from multiprocessing import Process, Queue
import numpy as np
import struct

"""
A class implementing buffered audio I/O.
"""
class Audio:

    """
    Initialize the audio buffer.
    """
    def __init__(self):
        #self.__rate = 96000
        self.__rate = 8000
        self.__stride = 4
        self.__pre_post = 4
        self.__read_queue = Queue()
        self.__write_queue = Queue()

    """
    Reads audio from an ALSA audio device into the read queue.
    Supposed to run in its own process.
    """
    def __read(self):
        inp = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NORMAL)
        inp.setchannels(1)
        inp.setrate(self.__rate)
        inp.setformat(alsaaudio.PCM_FORMAT_U32_BE)
        inp.setperiodsize(self.__rate / 50)

        while True:
            _, data = inp.read()
            self.__read_queue.put(data)

    """
    Writes audio to an ALSA audio device from the write queue.
    Supposed to run in its own process.
    """
    def __write(self):
        outp = alsaaudio.PCM(alsaaudio.PCM_PLAYBACK, alsaaudio.PCM_NORMAL)
        outp.setchannels(1)
        outp.setrate(self.__rate)
        outp.setformat(alsaaudio.PCM_FORMAT_U32_BE)
        outp.setperiodsize(self.__rate / 50)

        while True:
            data = self.__write_queue.get()
            outp.write(data)

    """
    Pre-post data into the output buffer to avoid buffer underrun.
    """
    def __pre_post_data(self):
        zeros = np.zeros(self.__rate / 50, dtype = np.uint32)

        for i in range(0, self.__pre_post):
            self.__write_queue.put(zeros)

    """
    Runs the read and write processes.
    """
    def run(self):
        self.__pre_post_data()
        read_process = Process(target = self.__read)
        write_process = Process(target = self.__write)
        read_process.start()
        write_process.start()

    """
    Reads audio samples from the queue captured from the reading thread.
    """
    def read(self):
        return self.__read_queue.get()

    """
    Writes audio samples to the queue to be played by the writing thread.
    """
    def write(self, data):
        self.__write_queue.put(data)

    """
    Pseudonymize the audio samples from a binary string into an array of integers.
    """
    def pseudonymize(self, s):
        return struct.unpack(">" + ("I" * (len(s) / self.__stride)), s)

    """
    Depseudonymize the audio samples from an array of integers into a binary string.
    """
    def depseudonymize(self, a):
        s = ""

        for elem in a:
            s += struct.pack(">I", elem)

        return s

    """
    Normalize the audio samples from an array of integers into an array of floats with unity level.
    """
    def normalize(self, data, max_val):
        data = np.array(data)
        bias = int(0.5 * max_val)
        fac = 1.0 / (0.5 * max_val)
        data = fac * (data - bias)
        return data

    """
    Denormalize the data from an array of floats with unity level into an array of integers.
    """
    def denormalize(self, data, max_val):
        bias = int(0.5 * max_val)
        fac = 0.5 * max_val
        data = np.array(data)
        data = (fac * data).astype(np.int64) + bias
        return data

debug = True
audio = Audio()
audio.run()

while True:
    data = audio.read()
    pdata = audio.pseudonymize(data)

    if debug:
        print "[PRE-PSEUDONYMIZED] Min: " + str(np.min(pdata)) + ", Max: " + str(np.max(pdata))

    ndata = audio.normalize(pdata, 0xffffffff)

    if debug:
        print "[PRE-NORMALIZED] Min: " + str(np.min(ndata)) + ", Max: " + str(np.max(ndata))
        print "[PRE-NORMALIZED] Level: " + str(int(10.0 * np.log10(np.max(np.absolute(ndata)))))

    #ndata += 0.01 # When I comment in this line, it wreaks complete havoc!

    if debug:
        print "[POST-NORMALIZED] Level: " + str(int(10.0 * np.log10(np.max(np.absolute(ndata)))))
        print "[POST-NORMALIZED] Min: " + str(np.min(ndata)) + ", Max: " + str(np.max(ndata))

    pdata = audio.denormalize(ndata, 0xffffffff)

    if debug:
        print "[POST-PSEUDONYMIZED] Min: " + str(np.min(pdata)) + ", Max: " + str(np.max(pdata))
        print ""

    data = audio.depseudonymize(pdata)
    audio.write(data)

然而,当我甚至对音频数据执行最轻微的修改(例如,注入该行)时,我在输出处得到了很多噪声和极端失真.好像我没有正确处理PCM数据.奇怪的是,"电平表"等的输出似乎都有意义.但是,当我稍微偏移它时,输出完全失真(但是连续).

编辑3:我刚刚发现我的算法(不包括在这里)在我将它们应用于波形文件时起作用.所以问题实际上似乎归结为ALSA API.

编辑4:我终于找到了问题.他们是以下.

1st - ALSA在请求PCM_FORMAT_U32_LE时悄悄地"回退"到PCM_FORMAT_U8_LE,因此我通过假设每个样本宽度为4个字节来错误地解释数据.它在我请求PCM_FORMAT_S32_LE时有效.

第二 - ALSA输出似乎期望以字节为单位的周期大小,即使它们明确表明它在规范中的中是预期的.因此,如果使用32位采样深度,则必须将输出的周期大小设置为输出的四倍.

第三 - 即使在Python(其中存在"全局解释器锁定")中,与Thread相比,进程也很慢.通过更改为线程,您可以大大减少延迟,因为I/O线程基本上不执行任何计算密集型操作.

推荐阅读
linjiabin43
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有