From 90c18b5ad9acf0b6cb8e6fd579b0caf47afd8c58 Mon Sep 17 00:00:00 2001 From: OZGCloud <ozgcloud@mgm-tp.com> Date: Wed, 18 Sep 2024 09:33:26 +0200 Subject: [PATCH] OZG-6730 Refactoring --- internal/server/handler.go | 267 ++++++++++++++++++++++--------------- 1 file changed, 158 insertions(+), 109 deletions(-) diff --git a/internal/server/handler.go b/internal/server/handler.go index 1cfff24..88a294c 100644 --- a/internal/server/handler.go +++ b/internal/server/handler.go @@ -41,9 +41,12 @@ import ( ) const ( - fileFormKey = "file" - vorgangIdFormKey = "vorgangId" - contentTypeHeaderKey = "Content-Type" + fileFormKey = "file" + vorgangIdFormKey = "vorgangId" + contentTypeHeaderKey = "Content-Type" + contentDispositionHeaderKey = "Content-Disposition" + maxFileSizeInMB = 150 + fileChunkSizeInByte = 1024 * 1024 ) func RegisterHomeEndpoint(mux *runtime.ServeMux) { @@ -86,143 +89,189 @@ func RegisterBinaryFileEndpoints(ctx context.Context, mux *runtime.ServeMux, grp } func RegisterFileUploadEndpoint(mux *runtime.ServeMux) { - err := mux.HandlePath("POST", "/api/v1/file", func(w http.ResponseWriter, r *http.Request, _ map[string]string) { - if err := r.ParseMultipartForm(32 << 20); err != nil { - http.Error(w, "invalid multipart form", http.StatusBadRequest) - return - } + err := mux.HandlePath("POST", "/api/v1/file", fileUploadHandler) - file, header, err := r.FormFile(fileFormKey) - if err != nil { - http.Error(w, "file retrieval error", http.StatusBadRequest) - return - } - defer file.Close() + if err != nil { + logger.Fatal("failed to register file upload endpoint: %v", err) + } +} - conn, err := grpc.NewClient(GetGrpcServerUrl(r.Header.Get(GrpcAddressHeader), conf), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - http.Error(w, "gRPC connection error", http.StatusInternalServerError) - return - } - defer conn.Close() +func fileUploadHandler(w http.ResponseWriter, r *http.Request, _ map[string]string) { + if err := r.ParseMultipartForm(maxFileSizeInMB << 20); err != nil { + http.Error(w, "invalid multipart form", http.StatusBadRequest) + return + } - client := pb.NewBinaryFileServiceClient(conn) - stream, err := client.UploadBinaryFileAsStream(r.Context()) - if err != nil { - http.Error(w, "stream creation error", http.StatusInternalServerError) - return - } + file, header, err := r.FormFile(fileFormKey) + if err != nil { + http.Error(w, "file retrieval error", http.StatusBadRequest) + return + } + defer file.Close() - meta := &pb.GrpcUploadBinaryFileRequest{ - Request: &pb.GrpcUploadBinaryFileRequest_Metadata{ - Metadata: &pb.GrpcUploadBinaryFileMetaData{ - VorgangId: r.FormValue(vorgangIdFormKey), - FileName: header.Filename, - ContentType: header.Header.Get(contentTypeHeaderKey), - }, - }, - } - if err := stream.Send(meta); err != nil { - http.Error(w, "metadata send error", http.StatusInternalServerError) - return - } + client, stream, err := createGrpcStream(r) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer client.Close() - buf := make([]byte, 1024*1024) - for { - n, err := file.Read(buf) - if err == io.EOF { - break - } - if err != nil { - http.Error(w, "file read error", http.StatusInternalServerError) - return - } + if err := sendFileMetadata(stream, r, header); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } - req := &pb.GrpcUploadBinaryFileRequest{Request: &pb.GrpcUploadBinaryFileRequest_FileContent{FileContent: buf[:n]}} - if err = stream.Send(req); err != nil { - http.Error(w, "chunk send error", http.StatusInternalServerError) - return - } - } + if err := sendFileChunks(stream, file); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } - resp, err := stream.CloseAndRecv() - if err != nil { - http.Error(w, "response error", http.StatusInternalServerError) - return - } + resp, err := stream.CloseAndRecv() + if err != nil { + http.Error(w, "response error", http.StatusInternalServerError) + return + } - w.WriteHeader(http.StatusOK) - w.Write([]byte(resp.FileId)) - }) + w.WriteHeader(http.StatusOK) + w.Write([]byte(resp.FileId)) +} + +func createGrpcStream(r *http.Request) (*grpc.ClientConn, pb.BinaryFileService_UploadBinaryFileAsStreamClient, error) { + target := GetGrpcServerUrl(r.Header.Get(GrpcAddressHeader), conf) + conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, nil, fmt.Errorf("gRPC connection error") + } + client := pb.NewBinaryFileServiceClient(conn) + stream, err := client.UploadBinaryFileAsStream(r.Context()) if err != nil { - logger.Fatal("endpoint registration failed: %v", err) + return nil, nil, fmt.Errorf("stream creation error") } + + return conn, stream, nil } -func RegisterGetFileEndpoint(mux *runtime.ServeMux) { - err := mux.HandlePath("GET", "/api/v1/file/content/{fileId}", func(w http.ResponseWriter, r *http.Request, vars map[string]string) { - var fileBuffer bytes.Buffer +func sendFileMetadata(stream pb.BinaryFileService_UploadBinaryFileAsStreamClient, r *http.Request, header *multipart.FileHeader) error { + meta := &pb.GrpcUploadBinaryFileRequest{ + Request: &pb.GrpcUploadBinaryFileRequest_Metadata{ + Metadata: &pb.GrpcUploadBinaryFileMetaData{ + VorgangId: r.FormValue(vorgangIdFormKey), + FileName: header.Filename, + ContentType: header.Header.Get(contentTypeHeaderKey), + }, + }, + } + + if err := stream.Send(meta); err != nil { + return fmt.Errorf("metadata send error") + } - conn, err := grpc.NewClient(GetGrpcServerUrl(r.Header.Get(GrpcAddressHeader), conf), grpc.WithTransportCredentials(insecure.NewCredentials())) + return nil +} + +func sendFileChunks(stream pb.BinaryFileService_UploadBinaryFileAsStreamClient, file multipart.File) error { + buf := make([]byte, fileChunkSizeInByte) + for { + n, err := file.Read(buf) + if err == io.EOF { + break + } if err != nil { - http.Error(w, "gRPC connection error", http.StatusInternalServerError) - return + return fmt.Errorf("file read error") } - defer conn.Close() - client := pb.NewBinaryFileServiceClient(conn) - req := &pb.GrpcGetBinaryFileDataRequest{FileId: vars["fileId"]} - clientStream, err := client.GetBinaryFileContent(r.Context(), req) - if err != nil { - http.Error(w, "stream creation error", http.StatusInternalServerError) - return + req := &pb.GrpcUploadBinaryFileRequest{ + Request: &pb.GrpcUploadBinaryFileRequest_FileContent{ + FileContent: buf[:n], + }, + } + if err = stream.Send(req); err != nil { + return fmt.Errorf("chunk send error") } + } - for { - resp, err := clientStream.Recv() - if err == io.EOF { - break - } - if err != nil { - http.Error(w, "error receiving file chunks", http.StatusInternalServerError) - return - } + return nil +} - _, err = fileBuffer.Write(resp.FileContent) - if err != nil { - http.Error(w, "error writing file data", http.StatusInternalServerError) - return - } - } +func RegisterGetFileEndpoint(mux *runtime.ServeMux) { + err := mux.HandlePath("GET", "/api/v1/file/content/{fileId}", getFileHandler) - w.Header().Set("Content-Type", "multipart/form-data") - w.Header().Set("Content-Disposition", `attachment; filename="file"`) - w.WriteHeader(http.StatusOK) + if err != nil { + logger.Fatal("failed to register get file endpoint: %v", err) + } +} - mw := multipart.NewWriter(w) - part, err := mw.CreateFormFile("file", "file") - if err != nil { - http.Error(w, "error creating multipart form file", http.StatusInternalServerError) - return - } +func getFileHandler(w http.ResponseWriter, r *http.Request, vars map[string]string) { + fileBuffer, err := fetchFileFromGrpc(r.Context(), vars["fileId"], r.Header.Get(GrpcAddressHeader)) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } - _, err = fileBuffer.WriteTo(part) + err = writeMultipartResponse(w, fileBuffer) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +func fetchFileFromGrpc(ctx context.Context, fileId, grpcAddress string) (*bytes.Buffer, error) { + var fileBuffer bytes.Buffer + + target := GetGrpcServerUrl(grpcAddress, conf) + conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, fmt.Errorf("gRPC connection error") + } + defer conn.Close() + + client := pb.NewBinaryFileServiceClient(conn) + req := &pb.GrpcGetBinaryFileDataRequest{FileId: fileId} + clientStream, err := client.GetBinaryFileContent(ctx, req) + if err != nil { + return nil, fmt.Errorf("stream creation error") + } + + for { + resp, err := clientStream.Recv() + if err == io.EOF { + break + } if err != nil { - http.Error(w, "error writing file data to multipart", http.StatusInternalServerError) - return + return nil, fmt.Errorf("error receiving file chunks") } - err = mw.Close() + _, err = fileBuffer.Write(resp.FileContent) if err != nil { - http.Error(w, "error closing multipart writer", http.StatusInternalServerError) - return + return nil, fmt.Errorf("error writing file data") } - }) + } + + return &fileBuffer, nil +} +func writeMultipartResponse(w http.ResponseWriter, fileBuffer *bytes.Buffer) error { + w.Header().Set(contentTypeHeaderKey, "multipart/form-data") + w.Header().Set(contentDispositionHeaderKey, `attachment; filename="file"`) + w.WriteHeader(http.StatusOK) + + mw := multipart.NewWriter(w) + part, err := mw.CreateFormFile(fileFormKey, fileFormKey) if err != nil { - logger.Fatal("endpoint registration failed: %v", err) + return fmt.Errorf("error creating multipart form file") } + + _, err = fileBuffer.WriteTo(part) + if err != nil { + return fmt.Errorf("error writing file data to multipart") + } + + err = mw.Close() + if err != nil { + return fmt.Errorf("error closing multipart writer") + } + + return nil } func ErrorHandler(ctx context.Context, mux *runtime.ServeMux, marshaler runtime.Marshaler, w http.ResponseWriter, r *http.Request, err error) { -- GitLab