3

I am getting this response from amazon streaming api. Can anybody please help me on it what thing I am doing wrong over here.

b'\x00\x00\x00\xa3\x00\x00\x00ah\x10k\xe1\x0f:exception-type\x07\x00\x13BadRequestException\r:content-type\x07\x00\x10application/json\r:message-type\x07\x00\texception{"Message":"Unexpected WebSocket frame received."}\xbd\xceK\x8a'

:message-type exception{"Message":"Unexpected WebSocket frame received."}½ÎK

I am using the code present below

Importing all the libraries

import asyncio
import websockets
import json
import sys, os, base64, datetime, hashlib, hmac, urllib
import pyaudio
import struct
import numpy as np
import wave
import argparse
import tempfile
import queue
import sys
import sounddevice as sd
import soundfile as sf
import numpy  # Make sure NumPy is loaded before it is used in the callback
assert numpy  # avoid "imported but unused" message (W0611)

Code for streaming audio using Soundevice

def int_or_str(text):
    """Helper function for argument parsing."""
    try:
        return int(text)
    except ValueError:
        return text


parser = argparse.ArgumentParser(add_help=False)
parser.add_argument('-l', '--list-devices', action='store_true', help='show list of audio devices and exit')
args, remaining = parser.parse_known_args()
if args.list_devices:
    print(sd.query_devices())
    parser.exit(0)
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter, parents=[parser])
parser.add_argument('filename', nargs='?', metavar='FILENAME',help='audio file to store recording to')
parser.add_argument('-d', '--device', type=int_or_str,help='input device (numeric ID or substring)')
parser.add_argument('-r', '--samplerate', type=int, help='sampling rate')
parser.add_argument('-c', '--channels', type=int, default=1, help='number of input channels')
parser.add_argument('-t', '--subtype', type=str, help='sound file subtype (e.g. "PCM_24")')
args = parser.parse_args(remaining)

q = queue.Queue()



def callback(indata, frames, time, status):
    """This is called (from a separate thread) for each audio block."""
    if status:
        print(status, file=sys.stderr)
    q.put(indata.copy())

Creating URl for connection as required by amazon transcribe

def createPresignedUrl(data):
    method = 'GET'
    service = 'transcribe'
    region = data['region']
    host = 'transcribestreaming.' + region + '.amazonaws.com:8443'
    endpoint = "wss://" + host

    def sign(key, msg):
        return hmac.new(key, msg.encode('utf-8'), hashlib.sha256).digest()

    def getSignatureKey(key, dateStamp, regionName, serviceName):
        kDate = sign(('AWS4' + key).encode('utf-8'), dateStamp)
        kRegion = sign(kDate, regionName)
        kService = sign(kRegion, serviceName)
        kSigning = sign(kService, 'aws4_request')
        return kSigning

    access_key = data['key']
    secret_key = data['secret']
    if access_key == '' or secret_key == '':
        print('No access key is available.')
        sys.exit()

    # Create a date for headers and the credential string
    t = datetime.datetime.utcnow()
    amz_date = t.strftime('%Y%m%dT%H%M%SZ') # Format date as YYYYMMDD'T'HHMMSS'Z'
    datestamp = t.strftime('%Y%m%d') # Date w/o time, used in credential scope

    canonical_uri = '/stream-transcription-websocket'

    canonical_headers = 'host:' + host + '\n'
    signed_headers = 'host'

    algorithm = 'AWS4-HMAC-SHA256'
    credential_scope = datestamp + '/' + region + '/' + service + '/' + 'aws4_request'

    canonical_querystring  = 'X-Amz-Algorithm=AWS4-HMAC-SHA256'
    canonical_querystring += '&X-Amz-Credential=' + urllib.parse.quote_plus(access_key + '/' + credential_scope)
    canonical_querystring += '&X-Amz-Date=' + amz_date
    canonical_querystring += '&X-Amz-Expires=300'
    canonical_querystring += '&X-Amz-SignedHeaders=' + signed_headers
    canonical_querystring += '&language-code=' + data['languageCode']
    canonical_querystring += '&media-encoding=pcm'
    canonical_querystring += '&sample-rate=' + str(data['sampleRate'])

    payload_hash = hashlib.sha256(('').encode('utf-8')).hexdigest()

    # Step 6: Combine elements to create canonical request
    canonical_request = method + '\n' + canonical_uri + '\n' + canonical_querystring + '\n' + canonical_headers + '\n' + signed_headers + '\n' + payload_hash

    string_to_sign = algorithm + '\n' +  amz_date + '\n' +  credential_scope + '\n' +  hashlib.sha256(canonical_request.encode('utf-8')).hexdigest()

    # Create the signing key
    signing_key = getSignatureKey(secret_key, datestamp, region, service)

    # Sign the string_to_sign using the signing_key
    signature = hmac.new(signing_key, (string_to_sign).encode("utf-8"), hashlib.sha256).hexdigest()

    canonical_querystring += '&X-Amz-Signature=' + signature

    request_url = endpoint + canonical_uri + "?" + canonical_querystring

    return request_url

data = {
    'key': 'Add your key',
    'secret': 'Add your secret key',
    'region': 'us-east-1',
    'languageCode': 'en-US',
    'sampleRate': 44100
}

Code for doing PCM encoding

url = createPresignedUrl(data)

# FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 44100
CHUNK = 16000
frames = []

# stream = audio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)

def pcmEncode(in_data):

    offset = 0
    input_len = len(in_data)
    buffer = []

    for i in range(input_len):
        offset += 2
        s = max(-1, min(1, in_data[i]))
        b = (s * 32768) if (s < 0) else (s * 32767)
        buffer.insert(offset, b)
    buffer = np.array(buffer)
    return buffer

def downsampleBuffer(buffer, outputSampleRate = 16000) :
    if outputSampleRate == RATE :
        return buffer
    sampleRateRatio = RATE / outputSampleRate
    newLength = round( len(buffer) / sampleRateRatio )
    result = []
    offsetResult = 0
    offsetBuffer = 0

    while offsetResult < newLength :
        nextOffsetBuffer = round((offsetResult + 1) * sampleRateRatio)
        accum = 0
        count = 0

        len_buffer = nextOffsetBuffer if nextOffsetBuffer < len(buffer) else len(buffer)

        for i in range( offsetBuffer, len_buffer):
            accum += buffer[i]
            count += 1

        result.append(accum / count)
        offsetResult += 1
        offsetBuffer = nextOffsetBuffer


    return result

Establishing a connection with AWS for doing transcription

async def start_stream():

    try:
        connection = websockets.connect(url)
        stream = sd.Stream(samplerate=args.samplerate, blocksize=None, device=args.device,
                           channels=args.channels, dtype=None, latency=None, extra_settings=None,
                           callback=None, finished_callback=None, clip_off=None, dither_off=None,
                           never_drop_input=None, prime_output_buffers_using_stream_callback=None)
        stream.start()
        while True:

            a = stream.read(200)
            buffer = downsampleBuffer(a[0])
            result = pcmEncode(buffer)
            async with connection as ws:
                sent_data = {
                    "headers": {
                        ":message-type": {"type": "string", "value": "event"},
                        ":event-type": {"type": "string", "value": "AudioEvent"},
                        ":content-type": {"type": "string", "value": "application/octet-stream"}
                    },
                    "body": str(result)
                }
                await ws.send(json.dumps(sent_data))
                response = await ws.recv()
                print(response)
                print(response.decode('latin1'))

    except KeyboardInterrupt:

        parser.exit('\nInterrupted by user')

    except Exception as e:

        parser.exit(type(e).__name__ + ': ' + str(e))

    if status:
        parser.exit('Error during playback: ' + str(status))

asyncio.get_event_loop().run_until_complete(start_stream())
stream.stop()
stream.close()
  • I'm having the same issue with my Dart/Flutter code: https://stackoverflow.com/questions/68037614/aws-transcribe-streaming-badrequestexception-could-not-decode-the-audio-stream – Hari Honor Jun 21 '21 at 11:19

1 Answers1

0

I believe now that BadRequestException refers to not having the frame encoded correctly rather than the audio data being wrong. I see a few issues with your code:

  1. You need to encoded the headers/body a special way: https://docs.aws.amazon.com/transcribe/latest/dg/event-stream.html

  2. You need to be very bit-saavy with the buffer you send. The audio needs to be 16bit/unsigned (int)/little-endian (See here). Right now you are just converting a float (is your mic data float??) to a 16bit friendly value but storing it in a buffer who's bit size is determined by the system (prob 32 or 64bit really) and you are encoding that with a JSON string encoder. Not likely to be the correct format after that. Basically you need a buffer library that lets you set the int with a specified bit size (16) and endian value (little). Here's my dart code for example:

for (var i=0; i<audioChunk.length; i++) {
  messageBytes.setInt16(offset, audioChunk[i], Endian.little);
  offset += 2;
}
  1. When you get to the EventStream header encoding mentioned above you have to be careful similarly that those 32bit length integers are BIG endian. Same rules apply. You need to write to a byte buffer in a way that lets you specify bitsize and endian'ness.

The best way to proceed is to write your decode functions which you'll need for the AWS response anyway and then decode your encoded frame and see if it comes out the same. Use test data for the audo like [-32000, -100, 0, 200 31000] or something like that so you can test the endianness, etc. is all correct.

Hari Honor
  • 8,677
  • 8
  • 51
  • 54