Skip to content
Snippets Groups Projects
Commit 90c18b5a authored by OZGCloud's avatar OZGCloud
Browse files

OZG-6730 Refactoring

parent 5eb0a40e
Branches
Tags
No related merge requests found
......@@ -44,6 +44,9 @@ const (
fileFormKey = "file"
vorgangIdFormKey = "vorgangId"
contentTypeHeaderKey = "Content-Type"
contentDispositionHeaderKey = "Content-Disposition"
maxFileSizeInMB = 150
fileChunkSizeInByte = 1024 * 1024
)
func RegisterHomeEndpoint(mux *runtime.ServeMux) {
......@@ -86,8 +89,15 @@ func RegisterBinaryFileEndpoints(ctx context.Context, mux *runtime.ServeMux, grp
}
func RegisterFileUploadEndpoint(mux *runtime.ServeMux) {
err := mux.HandlePath("POST", "/api/v1/file", func(w http.ResponseWriter, r *http.Request, _ map[string]string) {
if err := r.ParseMultipartForm(32 << 20); err != nil {
err := mux.HandlePath("POST", "/api/v1/file", fileUploadHandler)
if err != nil {
logger.Fatal("failed to register file upload endpoint: %v", err)
}
}
func fileUploadHandler(w http.ResponseWriter, r *http.Request, _ map[string]string) {
if err := r.ParseMultipartForm(maxFileSizeInMB << 20); err != nil {
http.Error(w, "invalid multipart form", http.StatusBadRequest)
return
}
......@@ -99,20 +109,50 @@ func RegisterFileUploadEndpoint(mux *runtime.ServeMux) {
}
defer file.Close()
conn, err := grpc.NewClient(GetGrpcServerUrl(r.Header.Get(GrpcAddressHeader), conf), grpc.WithTransportCredentials(insecure.NewCredentials()))
client, stream, err := createGrpcStream(r)
if err != nil {
http.Error(w, "gRPC connection error", http.StatusInternalServerError)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer conn.Close()
defer client.Close()
if err := sendFileMetadata(stream, r, header); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := sendFileChunks(stream, file); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp, err := stream.CloseAndRecv()
if err != nil {
http.Error(w, "response error", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(resp.FileId))
}
func createGrpcStream(r *http.Request) (*grpc.ClientConn, pb.BinaryFileService_UploadBinaryFileAsStreamClient, error) {
target := GetGrpcServerUrl(r.Header.Get(GrpcAddressHeader), conf)
conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, nil, fmt.Errorf("gRPC connection error")
}
client := pb.NewBinaryFileServiceClient(conn)
stream, err := client.UploadBinaryFileAsStream(r.Context())
if err != nil {
http.Error(w, "stream creation error", http.StatusInternalServerError)
return
return nil, nil, fmt.Errorf("stream creation error")
}
return conn, stream, nil
}
func sendFileMetadata(stream pb.BinaryFileService_UploadBinaryFileAsStreamClient, r *http.Request, header *multipart.FileHeader) error {
meta := &pb.GrpcUploadBinaryFileRequest{
Request: &pb.GrpcUploadBinaryFileRequest_Metadata{
Metadata: &pb.GrpcUploadBinaryFileMetaData{
......@@ -122,61 +162,74 @@ func RegisterFileUploadEndpoint(mux *runtime.ServeMux) {
},
},
}
if err := stream.Send(meta); err != nil {
http.Error(w, "metadata send error", http.StatusInternalServerError)
return
return fmt.Errorf("metadata send error")
}
return nil
}
buf := make([]byte, 1024*1024)
func sendFileChunks(stream pb.BinaryFileService_UploadBinaryFileAsStreamClient, file multipart.File) error {
buf := make([]byte, fileChunkSizeInByte)
for {
n, err := file.Read(buf)
if err == io.EOF {
break
}
if err != nil {
http.Error(w, "file read error", http.StatusInternalServerError)
return
return fmt.Errorf("file read error")
}
req := &pb.GrpcUploadBinaryFileRequest{Request: &pb.GrpcUploadBinaryFileRequest_FileContent{FileContent: buf[:n]}}
req := &pb.GrpcUploadBinaryFileRequest{
Request: &pb.GrpcUploadBinaryFileRequest_FileContent{
FileContent: buf[:n],
},
}
if err = stream.Send(req); err != nil {
http.Error(w, "chunk send error", http.StatusInternalServerError)
return
return fmt.Errorf("chunk send error")
}
}
resp, err := stream.CloseAndRecv()
return nil
}
func RegisterGetFileEndpoint(mux *runtime.ServeMux) {
err := mux.HandlePath("GET", "/api/v1/file/content/{fileId}", getFileHandler)
if err != nil {
http.Error(w, "response error", http.StatusInternalServerError)
return
logger.Fatal("failed to register get file endpoint: %v", err)
}
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(resp.FileId))
})
func getFileHandler(w http.ResponseWriter, r *http.Request, vars map[string]string) {
fileBuffer, err := fetchFileFromGrpc(r.Context(), vars["fileId"], r.Header.Get(GrpcAddressHeader))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = writeMultipartResponse(w, fileBuffer)
if err != nil {
logger.Fatal("endpoint registration failed: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
func RegisterGetFileEndpoint(mux *runtime.ServeMux) {
err := mux.HandlePath("GET", "/api/v1/file/content/{fileId}", func(w http.ResponseWriter, r *http.Request, vars map[string]string) {
func fetchFileFromGrpc(ctx context.Context, fileId, grpcAddress string) (*bytes.Buffer, error) {
var fileBuffer bytes.Buffer
conn, err := grpc.NewClient(GetGrpcServerUrl(r.Header.Get(GrpcAddressHeader), conf), grpc.WithTransportCredentials(insecure.NewCredentials()))
target := GetGrpcServerUrl(grpcAddress, conf)
conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
http.Error(w, "gRPC connection error", http.StatusInternalServerError)
return
return nil, fmt.Errorf("gRPC connection error")
}
defer conn.Close()
client := pb.NewBinaryFileServiceClient(conn)
req := &pb.GrpcGetBinaryFileDataRequest{FileId: vars["fileId"]}
clientStream, err := client.GetBinaryFileContent(r.Context(), req)
req := &pb.GrpcGetBinaryFileDataRequest{FileId: fileId}
clientStream, err := client.GetBinaryFileContent(ctx, req)
if err != nil {
http.Error(w, "stream creation error", http.StatusInternalServerError)
return
return nil, fmt.Errorf("stream creation error")
}
for {
......@@ -185,44 +238,40 @@ func RegisterGetFileEndpoint(mux *runtime.ServeMux) {
break
}
if err != nil {
http.Error(w, "error receiving file chunks", http.StatusInternalServerError)
return
return nil, fmt.Errorf("error receiving file chunks")
}
_, err = fileBuffer.Write(resp.FileContent)
if err != nil {
http.Error(w, "error writing file data", http.StatusInternalServerError)
return
return nil, fmt.Errorf("error writing file data")
}
}
w.Header().Set("Content-Type", "multipart/form-data")
w.Header().Set("Content-Disposition", `attachment; filename="file"`)
return &fileBuffer, nil
}
func writeMultipartResponse(w http.ResponseWriter, fileBuffer *bytes.Buffer) error {
w.Header().Set(contentTypeHeaderKey, "multipart/form-data")
w.Header().Set(contentDispositionHeaderKey, `attachment; filename="file"`)
w.WriteHeader(http.StatusOK)
mw := multipart.NewWriter(w)
part, err := mw.CreateFormFile("file", "file")
part, err := mw.CreateFormFile(fileFormKey, fileFormKey)
if err != nil {
http.Error(w, "error creating multipart form file", http.StatusInternalServerError)
return
return fmt.Errorf("error creating multipart form file")
}
_, err = fileBuffer.WriteTo(part)
if err != nil {
http.Error(w, "error writing file data to multipart", http.StatusInternalServerError)
return
return fmt.Errorf("error writing file data to multipart")
}
err = mw.Close()
if err != nil {
http.Error(w, "error closing multipart writer", http.StatusInternalServerError)
return
return fmt.Errorf("error closing multipart writer")
}
})
if err != nil {
logger.Fatal("endpoint registration failed: %v", err)
}
return nil
}
func ErrorHandler(ctx context.Context, mux *runtime.ServeMux, marshaler runtime.Marshaler, w http.ResponseWriter, r *http.Request, err error) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment