0

I want to write data using multiple threads, with each thread writing data of a given size starting from a given byte. I am sure that the write positions and chunk sizes are correct. When the data chunks arrive in order, the file's hash matches, but when the data chunks arrive out of order, the file is in the wrong order. I have added locks for writing, so why does this still happen?

client:

import os
import requests
from concurrent.futures import ThreadPoolExecutor
import datetime
import threading

print_lock = threading.Lock()
file_write_lock = threading.Lock()


def safe_print(*args, **kwargs):
    print_lock.acquire()
    print(*args, **kwargs)

    print_lock.release()


SERVER_URL = 'http://127.0.0.1:9090'
CHUNK_SIZE = 1024 * 128
MAX_WORKERS = 5
MAX_RETRIES = 3


def download_chunk(session, filesize, url, start_byte, file_path, filename):
    get_size = CHUNK_SIZE if not start_byte + CHUNK_SIZE >= filesize else filesize - start_byte
    response = session.get(url,
                           headers={'Range': f'bytes={start_byte}-{start_byte + get_size - 1}', 'X-Filename': filename})
    if response.status_code != 200:
        return response.text
    chunk = response.content
    safe_print(start_byte, len(chunk), get_size)
    with file_write_lock:
        with open(file_path, 'ab') as f:
            f.seek(start_byte)
            f.write(chunk)
            f.flush()
            os.fsync(f.fileno())


def download_file(file_path, filename):
    if os.path.exists(file_path):
        os.remove(file_path)

    with requests.Session() as session:
        url = f'{SERVER_URL}/download/'
        response = session.head(url, headers={'X-Filename': filename})

        if response.status_code == 404:
            raise FileNotFoundError("File not found on server.")

        file_size = int(response.headers.get('File-Size', 0))

        with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
            results = [executor.submit(download_chunk, session, file_size, url, i, file_path, filename)
                       for i in range(0, file_size, CHUNK_SIZE)]

            # for future in results:
            #     print(future.result())


if __name__ == '__main__':
    starttime = datetime.datetime.now()
    download_file('testfiles/download.txt', 'rest_api.txt')
    endtime = datetime.datetime.now()
    print("下载时间", endtime - starttime)

server:

from flask import Flask, request, Response, make_response
import os

app = Flask(__name__)
UPLOAD_FOLDER = 'uploads'
CHUNK_SIZE = 1024 * 128


@app.route('/download/', methods=['GET', 'HEAD'])
def download_file():
    filename = request.headers.get('X-Filename', None)
    if not filename:
        return make_response(('Missing filename in request headers.', 400))

    file_path = os.path.join(UPLOAD_FOLDER, filename)

    if not os.path.exists(file_path):
        return make_response(('File not found.', 404))

    file_size = os.path.getsize(file_path)

    if request.method == 'HEAD':
        response = make_response()
        response.headers.set('File-Size', file_size)
        return response

    bytes_range = request.headers.get('Range', 'bytes=0-').split('=')[1].split('-')
    start_byte = int(bytes_range[0])
    end_byte = int(bytes_range[1])
    if end_byte > file_size:
        return make_response(('End byte out of range.', 416))

    with open(file_path, 'rb') as f:
        f.seek(start_byte)
        chunk = f.read(end_byte - start_byte + 1)

    headers = {'Content-Disposition': f'attachment; filename={filename}',
               'Content-Range': f'bytes {start_byte}-{end_byte}/{file_size}',
               'Content-Type': 'application/octet-stream'}
    return Response(chunk, headers=headers)


if __name__ == '__main__':
    if not os.path.exists(UPLOAD_FOLDER):
        os.makedirs(UPLOAD_FOLDER)
    app.run(host='0.0.0.0', port=9090, debug=True)

I have tested with file and got the correct file when the data chunks were written in order.

  • It is not possible to simply `seek` to move the write pointer when a file is opened using `a`ppend mode, the `seek` only moves the read pointer (though without `+` reading will fail anyway), all writes will be appended to where the current end of the file. You will want to create the file first and then open the file using the `'r+b'` mode, which will permit seeking anywhere to put the read/write pointer. A [table](https://stackoverflow.com/a/30931305/) that shows the properties of the mode, note the "write after seek" property. – metatoaster Apr 17 '23 at 06:40
  • @metatoaster Thank you so much for your insightful answer. I really appreciate it and it has helped clear up my confusion. – learner L Apr 17 '23 at 06:49

0 Answers0