分片传输和流量控制

分片传输和流量控制

这其实是一个小功能,准备拆开来细谈一下,原来的功能是,整块数据进行上传到网关,然后由网关上传到HDFS,但是现在需要做一个新的解法。

因为原来的功能碰到了上传一个文件,第一个是文件太大了,一次性上传往往会占用太多的时间和空间,如果出现网络抖动,或者文件实在太大了挤满了,都会出现问题。第二个是,没有对网关进行流量控制,如果可能存在一个消息队列,在网络发送数据的时候进行分批次导流,就不会出现流量太大使得网关不能正常工作。

其次就是,网关这里是复数台机器,如果进行升级的时候,一般都是灰度发布,这样会使得大部分流量向一个网关倾斜,所以进行分片传输和导流,还是非常有必要的。

现在的需求是:

将一个大文件,进行切分,比如说切分为每个大小25MB,而且需要通过一个队列进行导流,分批次导流进入网关,再由网关缓存,等数据全部到达后,合并数据块,最后上传HDFS

分片传输功能

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
async def upload_py_fd(dst_dir_name, dst_basename, f, mode=0o640):

Base_Url = "/"


def split_file(file, chunk_size=1024):
print(f.getbuffer().nbytes)
while True:
chunk = file.read(chunk_size)
if not chunk:
break
yield chunk

# 生成一个随机的 UUID
random_uuid = uuid.uuid4()
# 如果传过来的是bytes,则需要转为file
if isinstance(f, bytes):
f = BytesIO(f)
total_size = f.getbuffer().nbytes
logger.info("the file byte total size is %s " % total_size)

# md5
def calculate_md5(file_object):
hash_object = hashlib.md5()
for chunk in iter(lambda: file_object.read(4096), b""):
hash_object.update(chunk)
file_object.seek(0) # 将文件指针重置到文件的开始位置
return hash_object.hexdigest()

# 假设你已经有了文件对象 f
md5 = calculate_md5(f)

# 分片大小,可自定义
# 表示1kb
chunk_size = 1024
# 表示 25MB
chunk_size = chunk_size * 25 * 1024

# resp
resp_dict = {}

# 分割文件
for i, chunk in enumerate(split_file(f, chunk_size=chunk_size)):
logger.info(f"Chunk index is : {i + 1}, the chunk is : {chunk}")
f = BytesIO(chunk)

data = {
"dzuuid": str(random_uuid),
"dzchunkindex": str(i),
"dztotalfilesize:": str(len(chunk)),
"dzchunksize:": str(chunk_size),
"dztotalchunkcount": str((total_size - 1) // chunk_size + 1),
"dzchunkbyteoffset": str(i * chunk_size),
"md5": str(md5),
"file": f,
"file_name": dst_basename,
"upload_type": str(1) # 分类型 0代表使用 time.time()_file_name ,1 代表只使用file_name
}

logger.info("post data info : %s" % data)

http_code, resp_buff = await apost(setting.DOMAIN + Base_Url + dst_dir_name, data=data)
logger.info("http code info %s" % http_code)
logger.info("resp is about wfs url %s" % resp_buff)
if http_code != 200:
raise ErrMsgError("upload version code:%s" % http_code)
else:
resp_dict.update(resp_buff)

return resp_dict

这里的做法是:

客户端会去将一个大的文件首先进行进行分片处理,然后初始化一些变量和参数,包括基本的URL路径、文件的UUID、文件的总大小等。计算文件的MD5哈希值,以确保文件的完整性,这里最后会到服务端去验证整个文件是否完整。

然后,设置分片大小,即将文件分割成多个较小的块。循环处理每个分片,将分片数据和相关信息构建成一个数据字典。

发起异步POST请求,将数据字典作为参数传递给apost函数,用于上传分片数据。

检查HTTP响应的状态码,如果不是200,则抛出异常。如果HTTP响应状态码为200,则将响应结果添加到结果字典中。

循环结束后,返回结果字典,其中包含了每个分片的上传结果

服务端

然后,接收到信息后的做法是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
async def slice_upload_wfs(path):
# 鉴权等功能省略

res = {}
req_body = await request.form
files = await request.files
logger.info("show json req_body:%s, file:%s" % (req_body, files))
file = files.get("file", None)
if not file:
return Response("not file", 400, content_type="text/html; charset=UTF-8")
offset = int(req_body.get("dzchunkbyteoffset", 0))
md5 = req_body.get("md5", "")
idx = int(req_body.get("dzchunkindex", 0))
block_num = int(req_body.get("dztotalchunkcount", 0))
uuid = req_body.get("dzuuid", "")
name = file.filename
logger.info("origin file name info %s " % name)
logger.info(" info : %s" % req_body)
if name == "file":
# 如果传有别名,就使用别名,接口适配
name = req_body.get("file_name", "file")
logger.info("now , the file name is %s " % name)

tmp_full_path = os.path.join(UPLOAD_TEMP_PATH, "%s_%s" % (str(uuid), name))
logger.info(f' slice_upload_wfs Processing {name}')
if idx == 0 and os.path.exists(tmp_full_path):
# 文件传输失败后重新上传,清空前面失败的
os.remove(tmp_full_path)
with open(tmp_full_path, "ab") as f:
f.seek(offset)
f.write(file.stream.read())

if idx + 1 == block_num:
t1 = time.time()
local_md5 = await get_file_md5(tmp_full_path)
logger.info("calc md5! cost:%s" % (time.time() - t1,))
if md5 == local_md5:
try:
upload_type = int(req_body.get("upload_type", 0)) # 分类型 0代表使用 time.time()_file_name ,1 代表只使用file_name
if upload_type == 1:
wfs_target_path = os.path.join(WFS_BASE_PATH, path, "%s" % (str(name)))
else:
wfs_target_path = os.path.join(WFS_BASE_PATH, path, "%s_%s" % (str(int(time.time() * 1000)), name))
t1 = time.time()
await wfs_upload(client, tmp_full_path, wfs_target_path, overwrite=True)
logger.info("end upload! cost:%s" % (time.time()-t1, ))
except Exception as e:
logger.info("slice_upload_wfs Traceback:%s" % e)
return Response(e.args[0], 400, content_type="text/html; charset=UTF-8")
finally:
os.remove(tmp_full_path)
res[name] = wfs_target_path
else:
if os.path.exists(tmp_full_path):
# 清理失败的文件
os.remove(tmp_full_path)
logger.info("Traceback md5 is not equal :%s %s" % (md5, local_md5))
return Response(" %s md5:%s check fail!" % (tmp_full_path, md5), 500,
content_type="text/html; charset=UTF-8")
return Response(json.dumps(res), 200, content_type="application/json; charset=UTF-8")

这段代码是一个异步函数,用于处理分片上传文件到指定路径的功能。下面是代码的详细解释:

首先是通过await request.form获取请求的表单数据,通过await request.files获取上传的文件对象。检查是否存在名为”file”的文件对象,如果不存在则返回一个400状态码的响应。

之后从请求的表单数据中获取偏移量offset、MD5哈希值md5、分片索引idx、总块数block_num和UUIDuuid等信息。这些信息必然不是只有一次的,这里会接受到多次这种信息,每次都用来写入到文件当中

然后,构建临时文件的完整路径tmp_full_path,格式为UUID和文件名的组合。这里需要将每次传输到的信息分片存入。

如果是第一个分片(索引为0)并且临时文件已存在,则删除之前的临时文件。说明这个很可能是失效的。

打开临时文件,将文件指针定位到指定的偏移量,并将分片数据写入文件。

如果是最后一个分片(索引加1等于总块数),则进行以下操作:

  • 计算临时文件的MD5哈希值local_md5。
  • 比较计算得到的MD5哈希值和请求中的MD5哈希值,如果相等则进行上传操作。
  • 根据上传类型upload_type的值,构建目标路径wfs_target_path。
  • 调用wfs_upload函数,将临时文件上传到目标路径。
  • 如果上传成功,则将上传后的文件路径添加到结果字典res中,并删除临时文件。
  • 如果上传失败,则返回一个400状态码的响应。

最后,返回一个200状态码的响应,其中包含了上传结果的JSON格式数据。

这段代码实现了分片上传文件的功能,它通过处理每个分片的数据和相关信息,将分片数据写入临时文件,并在最后一个分片完成时进行上传操作。它还包括了对上传结果的处理和错误处理逻辑。

总体而言还是比较简单的,有些技术看起来比较唬人,其实也很简单。。