我打算用Python实现一个"类DSP"信号处理器.它应该通过ALSA捕获音频的小片段,处理它们,然后通过ALSA播放它们.
为了开始,我编写了以下(非常简单的)代码.
import alsaaudio inp = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NORMAL) inp.setchannels(1) inp.setrate(96000) inp.setformat(alsaaudio.PCM_FORMAT_U32_LE) inp.setperiodsize(1920) outp = alsaaudio.PCM(alsaaudio.PCM_PLAYBACK, alsaaudio.PCM_NORMAL) outp.setchannels(1) outp.setrate(96000) outp.setformat(alsaaudio.PCM_FORMAT_U32_LE) outp.setperiodsize(1920) while True: l, data = inp.read() # TODO: Perform some processing. outp.write(data)
问题是,音频"口吃"并且不是无间隙的.我尝试使用PCM模式,将其设置为PCM_ASYNC或PCM_NONBLOCK,但问题仍然存在.我认为问题是两个后续调用"inp.read()"之间的样本"丢失".
有没有办法在Python中"连续"捕获音频(最好不需要太"特定"/"非标准"的库)?我希望信号总是"在后台"被捕获到一些缓冲区中,我可以从中读取一些"瞬间状态",同时即使在我执行读取操作的时间内,音频也会被捕获到缓冲区中.我怎样才能做到这一点?
即使我使用专用的进程/线程来捕获音频,这个进程/线程总是至少必须(1)从源读取音频,(2)然后将其放入某个缓冲区(从中"信号处理")进程/线程然后读取).因此,这两个操作仍将按时间顺序进行,因此样本将丢失.我该如何避免这种情况?
非常感谢您的建议!
编辑2:现在我开始运行了.
import alsaaudio from multiprocessing import Process, Queue import numpy as np import struct """ A class implementing buffered audio I/O. """ class Audio: """ Initialize the audio buffer. """ def __init__(self): #self.__rate = 96000 self.__rate = 8000 self.__stride = 4 self.__pre_post = 4 self.__read_queue = Queue() self.__write_queue = Queue() """ Reads audio from an ALSA audio device into the read queue. Supposed to run in its own process. """ def __read(self): inp = alsaaudio.PCM(alsaaudio.PCM_CAPTURE, alsaaudio.PCM_NORMAL) inp.setchannels(1) inp.setrate(self.__rate) inp.setformat(alsaaudio.PCM_FORMAT_U32_BE) inp.setperiodsize(self.__rate / 50) while True: _, data = inp.read() self.__read_queue.put(data) """ Writes audio to an ALSA audio device from the write queue. Supposed to run in its own process. """ def __write(self): outp = alsaaudio.PCM(alsaaudio.PCM_PLAYBACK, alsaaudio.PCM_NORMAL) outp.setchannels(1) outp.setrate(self.__rate) outp.setformat(alsaaudio.PCM_FORMAT_U32_BE) outp.setperiodsize(self.__rate / 50) while True: data = self.__write_queue.get() outp.write(data) """ Pre-post data into the output buffer to avoid buffer underrun. """ def __pre_post_data(self): zeros = np.zeros(self.__rate / 50, dtype = np.uint32) for i in range(0, self.__pre_post): self.__write_queue.put(zeros) """ Runs the read and write processes. """ def run(self): self.__pre_post_data() read_process = Process(target = self.__read) write_process = Process(target = self.__write) read_process.start() write_process.start() """ Reads audio samples from the queue captured from the reading thread. """ def read(self): return self.__read_queue.get() """ Writes audio samples to the queue to be played by the writing thread. """ def write(self, data): self.__write_queue.put(data) """ Pseudonymize the audio samples from a binary string into an array of integers. """ def pseudonymize(self, s): return struct.unpack(">" + ("I" * (len(s) / self.__stride)), s) """ Depseudonymize the audio samples from an array of integers into a binary string. """ def depseudonymize(self, a): s = "" for elem in a: s += struct.pack(">I", elem) return s """ Normalize the audio samples from an array of integers into an array of floats with unity level. """ def normalize(self, data, max_val): data = np.array(data) bias = int(0.5 * max_val) fac = 1.0 / (0.5 * max_val) data = fac * (data - bias) return data """ Denormalize the data from an array of floats with unity level into an array of integers. """ def denormalize(self, data, max_val): bias = int(0.5 * max_val) fac = 0.5 * max_val data = np.array(data) data = (fac * data).astype(np.int64) + bias return data debug = True audio = Audio() audio.run() while True: data = audio.read() pdata = audio.pseudonymize(data) if debug: print "[PRE-PSEUDONYMIZED] Min: " + str(np.min(pdata)) + ", Max: " + str(np.max(pdata)) ndata = audio.normalize(pdata, 0xffffffff) if debug: print "[PRE-NORMALIZED] Min: " + str(np.min(ndata)) + ", Max: " + str(np.max(ndata)) print "[PRE-NORMALIZED] Level: " + str(int(10.0 * np.log10(np.max(np.absolute(ndata))))) #ndata += 0.01 # When I comment in this line, it wreaks complete havoc! if debug: print "[POST-NORMALIZED] Level: " + str(int(10.0 * np.log10(np.max(np.absolute(ndata))))) print "[POST-NORMALIZED] Min: " + str(np.min(ndata)) + ", Max: " + str(np.max(ndata)) pdata = audio.denormalize(ndata, 0xffffffff) if debug: print "[POST-PSEUDONYMIZED] Min: " + str(np.min(pdata)) + ", Max: " + str(np.max(pdata)) print "" data = audio.depseudonymize(pdata) audio.write(data)
然而,当我甚至对音频数据执行最轻微的修改(例如,注入该行)时,我在输出处得到了很多噪声和极端失真.好像我没有正确处理PCM数据.奇怪的是,"电平表"等的输出似乎都有意义.但是,当我稍微偏移它时,输出完全失真(但是连续).
编辑3:我刚刚发现我的算法(不包括在这里)在我将它们应用于波形文件时起作用.所以问题实际上似乎归结为ALSA API.
编辑4:我终于找到了问题.他们是以下.
1st - ALSA在请求PCM_FORMAT_U32_LE时悄悄地"回退"到PCM_FORMAT_U8_LE,因此我通过假设每个样本宽度为4个字节来错误地解释数据.它在我请求PCM_FORMAT_S32_LE时有效.
第二 - ALSA输出似乎期望以字节为单位的周期大小,即使它们明确表明它在规范中的帧中是预期的.因此,如果使用32位采样深度,则必须将输出的周期大小设置为输出的四倍.
第三 - 即使在Python(其中存在"全局解释器锁定")中,与Thread相比,进程也很慢.通过更改为线程,您可以大大减少延迟,因为I/O线程基本上不执行任何计算密集型操作.