I want to write data using multiple threads, with each thread writing data of a given size starting from a given byte. I am sure that the write positions and chunk sizes are correct. When the data chunks arrive in order, the file's hash matches, but when the data chunks arrive out of order, the file is in the wrong order. I have added locks for writing, so why does this still happen?
client:
import os
import requests
from concurrent.futures import ThreadPoolExecutor
import datetime
import threading
print_lock = threading.Lock()
file_write_lock = threading.Lock()
def safe_print(*args, **kwargs):
print_lock.acquire()
print(*args, **kwargs)
print_lock.release()
SERVER_URL = 'http://127.0.0.1:9090'
CHUNK_SIZE = 1024 * 128
MAX_WORKERS = 5
MAX_RETRIES = 3
def download_chunk(session, filesize, url, start_byte, file_path, filename):
get_size = CHUNK_SIZE if not start_byte + CHUNK_SIZE >= filesize else filesize - start_byte
response = session.get(url,
headers={'Range': f'bytes={start_byte}-{start_byte + get_size - 1}', 'X-Filename': filename})
if response.status_code != 200:
return response.text
chunk = response.content
safe_print(start_byte, len(chunk), get_size)
with file_write_lock:
with open(file_path, 'ab') as f:
f.seek(start_byte)
f.write(chunk)
f.flush()
os.fsync(f.fileno())
def download_file(file_path, filename):
if os.path.exists(file_path):
os.remove(file_path)
with requests.Session() as session:
url = f'{SERVER_URL}/download/'
response = session.head(url, headers={'X-Filename': filename})
if response.status_code == 404:
raise FileNotFoundError("File not found on server.")
file_size = int(response.headers.get('File-Size', 0))
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
results = [executor.submit(download_chunk, session, file_size, url, i, file_path, filename)
for i in range(0, file_size, CHUNK_SIZE)]
# for future in results:
# print(future.result())
if __name__ == '__main__':
starttime = datetime.datetime.now()
download_file('testfiles/download.txt', 'rest_api.txt')
endtime = datetime.datetime.now()
print("下载时间", endtime - starttime)
server:
from flask import Flask, request, Response, make_response
import os
app = Flask(__name__)
UPLOAD_FOLDER = 'uploads'
CHUNK_SIZE = 1024 * 128
@app.route('/download/', methods=['GET', 'HEAD'])
def download_file():
filename = request.headers.get('X-Filename', None)
if not filename:
return make_response(('Missing filename in request headers.', 400))
file_path = os.path.join(UPLOAD_FOLDER, filename)
if not os.path.exists(file_path):
return make_response(('File not found.', 404))
file_size = os.path.getsize(file_path)
if request.method == 'HEAD':
response = make_response()
response.headers.set('File-Size', file_size)
return response
bytes_range = request.headers.get('Range', 'bytes=0-').split('=')[1].split('-')
start_byte = int(bytes_range[0])
end_byte = int(bytes_range[1])
if end_byte > file_size:
return make_response(('End byte out of range.', 416))
with open(file_path, 'rb') as f:
f.seek(start_byte)
chunk = f.read(end_byte - start_byte + 1)
headers = {'Content-Disposition': f'attachment; filename={filename}',
'Content-Range': f'bytes {start_byte}-{end_byte}/{file_size}',
'Content-Type': 'application/octet-stream'}
return Response(chunk, headers=headers)
if __name__ == '__main__':
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
app.run(host='0.0.0.0', port=9090, debug=True)
I have tested with file and got the correct file when the data chunks were written in order.