分片传输和流量控制 这其实是一个小功能,准备拆开来细谈一下,原来的功能是,整块数据进行上传到网关,然后由网关上传到HDFS,但是现在需要做一个新的解法。
因为原来的功能碰到了上传一个文件,第一个是文件太大了,一次性上传往往会占用太多的时间和空间,如果出现网络抖动,或者文件实在太大了挤满了,都会出现问题。第二个是,没有对网关进行流量控制,如果可能存在一个消息队列,在网络发送数据的时候进行分批次导流,就不会出现流量太大使得网关不能正常工作。
其次就是,网关这里是复数台机器,如果进行升级的时候,一般都是灰度发布,这样会使得大部分流量向一个网关倾斜,所以进行分片传输和导流,还是非常有必要的。
现在的需求是:
将一个大文件,进行切分,比如说切分为每个大小25MB,而且需要通过一个队列进行导流,分批次导流进入网关,再由网关缓存,等数据全部到达后,合并数据块,最后上传HDFS
分片传输功能 客户端 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 async def upload_py_fd (dst_dir_name, dst_basename, f, mode=0o640 ) : Base_Url = "/" def split_file (file, chunk_size=1024 ) : print(f.getbuffer().nbytes) while True : chunk = file.read(chunk_size) if not chunk: break yield chunk random_uuid = uuid.uuid4() if isinstance(f, bytes): f = BytesIO(f) total_size = f.getbuffer().nbytes logger.info("the file byte total size is %s " % total_size) def calculate_md5 (file_object) : hash_object = hashlib.md5() for chunk in iter(lambda : file_object.read(4096 ), b"" ): hash_object.update(chunk) file_object.seek(0 ) return hash_object.hexdigest() md5 = calculate_md5(f) chunk_size = 1024 chunk_size = chunk_size * 25 * 1024 resp_dict = {} for i, chunk in enumerate(split_file(f, chunk_size=chunk_size)): logger.info(f"Chunk index is : {i + 1 } , the chunk is : {chunk} " ) f = BytesIO(chunk) data = { "dzuuid" : str(random_uuid), "dzchunkindex" : str(i), "dztotalfilesize:" : str(len(chunk)), "dzchunksize:" : str(chunk_size), "dztotalchunkcount" : str((total_size - 1 ) // chunk_size + 1 ), "dzchunkbyteoffset" : str(i * chunk_size), "md5" : str(md5), "file" : f, "file_name" : dst_basename, "upload_type" : str(1 ) } logger.info("post data info : %s" % data) http_code, resp_buff = await apost(setting.DOMAIN + Base_Url + dst_dir_name, data=data) logger.info("http code info %s" % http_code) logger.info("resp is about wfs url %s" % resp_buff) if http_code != 200 : raise ErrMsgError("upload version code:%s" % http_code) else : resp_dict.update(resp_buff) return resp_dict
这里的做法是:
客户端会去将一个大的文件首先进行进行分片处理,然后初始化一些变量和参数,包括基本的URL路径、文件的UUID、文件的总大小等。计算文件的MD5哈希值,以确保文件的完整性,这里最后会到服务端去验证整个文件是否完整。
然后,设置分片大小,即将文件分割成多个较小的块。循环处理每个分片,将分片数据和相关信息构建成一个数据字典。
发起异步POST请求,将数据字典作为参数传递给apost函数,用于上传分片数据。
检查HTTP响应的状态码,如果不是200,则抛出异常。如果HTTP响应状态码为200,则将响应结果添加到结果字典中。
循环结束后,返回结果字典,其中包含了每个分片的上传结果
服务端 然后,接收到信息后的做法是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 async def slice_upload_wfs (path) : res = {} req_body = await request.form files = await request.files logger.info("show json req_body:%s, file:%s" % (req_body, files)) file = files.get("file" , None ) if not file: return Response("not file" , 400 , content_type="text/html; charset=UTF-8" ) offset = int(req_body.get("dzchunkbyteoffset" , 0 )) md5 = req_body.get("md5" , "" ) idx = int(req_body.get("dzchunkindex" , 0 )) block_num = int(req_body.get("dztotalchunkcount" , 0 )) uuid = req_body.get("dzuuid" , "" ) name = file.filename logger.info("origin file name info %s " % name) logger.info(" info : %s" % req_body) if name == "file" : name = req_body.get("file_name" , "file" ) logger.info("now , the file name is %s " % name) tmp_full_path = os.path.join(UPLOAD_TEMP_PATH, "%s_%s" % (str(uuid), name)) logger.info(f' slice_upload_wfs Processing {name} ' ) if idx == 0 and os.path.exists(tmp_full_path): os.remove(tmp_full_path) with open(tmp_full_path, "ab" ) as f: f.seek(offset) f.write(file.stream.read()) if idx + 1 == block_num: t1 = time.time() local_md5 = await get_file_md5(tmp_full_path) logger.info("calc md5! cost:%s" % (time.time() - t1,)) if md5 == local_md5: try : upload_type = int(req_body.get("upload_type" , 0 )) if upload_type == 1 : wfs_target_path = os.path.join(WFS_BASE_PATH, path, "%s" % (str(name))) else : wfs_target_path = os.path.join(WFS_BASE_PATH, path, "%s_%s" % (str(int(time.time() * 1000 )), name)) t1 = time.time() await wfs_upload(client, tmp_full_path, wfs_target_path, overwrite=True ) logger.info("end upload! cost:%s" % (time.time()-t1, )) except Exception as e: logger.info("slice_upload_wfs Traceback:%s" % e) return Response(e.args[0 ], 400 , content_type="text/html; charset=UTF-8" ) finally : os.remove(tmp_full_path) res[name] = wfs_target_path else : if os.path.exists(tmp_full_path): os.remove(tmp_full_path) logger.info("Traceback md5 is not equal :%s %s" % (md5, local_md5)) return Response(" %s md5:%s check fail!" % (tmp_full_path, md5), 500 , content_type="text/html; charset=UTF-8" ) return Response(json.dumps(res), 200 , content_type="application/json; charset=UTF-8" )
这段代码是一个异步函数,用于处理分片上传文件到指定路径的功能。下面是代码的详细解释:
首先是通过await request.form获取请求的表单数据,通过await request.files获取上传的文件对象。检查是否存在名为”file”的文件对象,如果不存在则返回一个400状态码的响应。
之后从请求的表单数据中获取偏移量offset、MD5哈希值md5、分片索引idx、总块数block_num和UUIDuuid等信息。这些信息必然不是只有一次的,这里会接受到多次这种信息,每次都用来写入到文件当中
然后,构建临时文件的完整路径tmp_full_path,格式为UUID和文件名的组合。这里需要将每次传输到的信息分片存入。
如果是第一个分片(索引为0)并且临时文件已存在,则删除之前的临时文件。说明这个很可能是失效的。
打开临时文件,将文件指针定位到指定的偏移量,并将分片数据写入文件。
如果是最后一个分片(索引加1等于总块数),则进行以下操作:
计算临时文件的MD5哈希值local_md5。
比较计算得到的MD5哈希值和请求中的MD5哈希值,如果相等则进行上传操作。
根据上传类型upload_type的值,构建目标路径wfs_target_path。
调用wfs_upload函数,将临时文件上传到目标路径。
如果上传成功,则将上传后的文件路径添加到结果字典res中,并删除临时文件。
如果上传失败,则返回一个400状态码的响应。
最后,返回一个200状态码的响应,其中包含了上传结果的JSON格式数据。
这段代码实现了分片上传文件的功能,它通过处理每个分片的数据和相关信息,将分片数据写入临时文件,并在最后一个分片完成时进行上传操作。它还包括了对上传结果的处理和错误处理逻辑。
总体而言还是比较简单的,有些技术看起来比较唬人,其实也很简单。。