diff --git a/internal/mock/grpc_server.go b/internal/mock/grpc_server.go index d501abf9b357610d7998816bfd0ac5dce8b10a5d..66f6fd73b5e8fef4ba4fb68bc48ee9dd6b9096f7 100644 --- a/internal/mock/grpc_server.go +++ b/internal/mock/grpc_server.go @@ -95,7 +95,11 @@ func (s *binaryFileServer) FindBinaryFilesMetaData(_ context.Context, _ *pb.Grpc return &pb.GrpcFindFilesResponse{}, nil } -func (s *binaryFileServer) GetBinaryFileContent(_ *pb.GrpcGetBinaryFileDataRequest, stream pb.BinaryFileService_GetBinaryFileContentServer) error { +func (s *binaryFileServer) GetBinaryFileContent(in *pb.GrpcGetBinaryFileDataRequest, stream pb.BinaryFileService_GetBinaryFileContentServer) error { + if in.FileId == "" { + return status.Error(codes.InvalidArgument, "FileId is missing") + } + fp := defaultFilePath if testing.Testing() { fp = testFilePath @@ -117,9 +121,7 @@ func (s *binaryFileServer) GetBinaryFileContent(_ *pb.GrpcGetBinaryFileDataReque return err } - chunk := &pb.GrpcGetBinaryFileDataResponse{ - FileContent: buffer[:bytesRead], - } + chunk := &pb.GrpcGetBinaryFileDataResponse{FileContent: buffer[:bytesRead]} if err := stream.Send(chunk); err != nil { return err } diff --git a/internal/server/grpc_router.go b/internal/server/grpc_router.go index ba506dae05d69bd15d4035adff243222ca4bcef8..b34e99683913f3ec106188865044ccb2e5610240 100644 --- a/internal/server/grpc_router.go +++ b/internal/server/grpc_router.go @@ -195,7 +195,7 @@ func StartGrpcRouter() *grpc.Server { } logger.Info("gRPC router listening on port %d", conf.Grpc.Router.Port) - if err := s.Serve(lis); err != nil { + if err = s.Serve(lis); err != nil { logger.Fatal("gRPC router failed to serve: %v", err) } diff --git a/internal/server/grpc_router_test.go b/internal/server/grpc_router_test.go index 0f81f57990bc82fe7c69d694d6b28b0ff2796ba1..15e55208b8f2453452b766aaa308dc68061480e9 100644 --- a/internal/server/grpc_router_test.go +++ b/internal/server/grpc_router_test.go @@ -69,7 +69,7 @@ func TestStartGrpcRouter(t *testing.T) { client, cleanUp := setUpGrpcRouterEnv() defer cleanUp() - md := metadata.New(map[string]string{"x-grpc-address": "localhost"}) + md := metadata.New(map[string]string{GrpcAddressMetadata: "localhost"}) ctx := metadata.NewOutgoingContext(context.Background(), md) resp, err := client.FindRueckfragen(ctx, &pb.GrpcFindRueckfragenRequest{PostfachId: "testPostfachId"}) @@ -81,7 +81,7 @@ func TestStartGrpcRouter(t *testing.T) { client, cleanUp := setUpGrpcRouterEnv() defer cleanUp() - md := metadata.New(map[string]string{"x-grpc-address": "localhost"}) + md := metadata.New(map[string]string{GrpcAddressMetadata: "localhost"}) ctx := metadata.NewOutgoingContext(context.Background(), md) resp, err := client.FindRueckfragen(ctx, &pb.GrpcFindRueckfragenRequest{}) diff --git a/internal/server/handler.go b/internal/server/handler.go index 1cfff2468b279681cd3a2462d3a46123d17a03d4..ba6bfb662f2f27ae3787b0e36f9ae6a2d4ea5654 100644 --- a/internal/server/handler.go +++ b/internal/server/handler.go @@ -41,9 +41,13 @@ import ( ) const ( - fileFormKey = "file" - vorgangIdFormKey = "vorgangId" - contentTypeHeaderKey = "Content-Type" + fileIdParam = "fileId" + fileFormKey = "file" + vorgangIdFormKey = "vorgangId" + contentTypeHeaderKey = "Content-Type" + contentDispositionHeaderKey = "Content-Disposition" + maxFileSizeInMB = 150 + fileChunkSizeInByte = 1024 * 1024 ) func RegisterHomeEndpoint(mux *runtime.ServeMux) { @@ -86,143 +90,189 @@ func RegisterBinaryFileEndpoints(ctx context.Context, mux *runtime.ServeMux, grp } func RegisterFileUploadEndpoint(mux *runtime.ServeMux) { - err := mux.HandlePath("POST", "/api/v1/file", func(w http.ResponseWriter, r *http.Request, _ map[string]string) { - if err := r.ParseMultipartForm(32 << 20); err != nil { - http.Error(w, "invalid multipart form", http.StatusBadRequest) - return - } + err := mux.HandlePath("POST", "/api/v1/file", fileUploadHandler) - file, header, err := r.FormFile(fileFormKey) - if err != nil { - http.Error(w, "file retrieval error", http.StatusBadRequest) - return - } - defer file.Close() + if err != nil { + logger.Fatal("failed to register file upload endpoint: %v", err) + } +} - conn, err := grpc.NewClient(GetGrpcServerUrl(r.Header.Get(GrpcAddressHeader), conf), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - http.Error(w, "gRPC connection error", http.StatusInternalServerError) - return - } - defer conn.Close() +func fileUploadHandler(w http.ResponseWriter, r *http.Request, _ map[string]string) { + if err := r.ParseMultipartForm(maxFileSizeInMB << 20); err != nil { + http.Error(w, "invalid multipart form", http.StatusBadRequest) + return + } - client := pb.NewBinaryFileServiceClient(conn) - stream, err := client.UploadBinaryFileAsStream(r.Context()) - if err != nil { - http.Error(w, "stream creation error", http.StatusInternalServerError) - return - } + file, header, err := r.FormFile(fileFormKey) + if err != nil { + http.Error(w, "file retrieval error", http.StatusBadRequest) + return + } + defer file.Close() - meta := &pb.GrpcUploadBinaryFileRequest{ - Request: &pb.GrpcUploadBinaryFileRequest_Metadata{ - Metadata: &pb.GrpcUploadBinaryFileMetaData{ - VorgangId: r.FormValue(vorgangIdFormKey), - FileName: header.Filename, - ContentType: header.Header.Get(contentTypeHeaderKey), - }, - }, - } - if err := stream.Send(meta); err != nil { - http.Error(w, "metadata send error", http.StatusInternalServerError) - return - } + client, stream, err := createGrpcStream(r) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + defer client.Close() - buf := make([]byte, 1024*1024) - for { - n, err := file.Read(buf) - if err == io.EOF { - break - } - if err != nil { - http.Error(w, "file read error", http.StatusInternalServerError) - return - } + if err := sendFileMetadata(stream, r, header); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } - req := &pb.GrpcUploadBinaryFileRequest{Request: &pb.GrpcUploadBinaryFileRequest_FileContent{FileContent: buf[:n]}} - if err = stream.Send(req); err != nil { - http.Error(w, "chunk send error", http.StatusInternalServerError) - return - } - } + if err := sendFileChunks(stream, file); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } - resp, err := stream.CloseAndRecv() - if err != nil { - http.Error(w, "response error", http.StatusInternalServerError) - return - } + resp, err := stream.CloseAndRecv() + if err != nil { + http.Error(w, "response error", http.StatusInternalServerError) + return + } - w.WriteHeader(http.StatusOK) - w.Write([]byte(resp.FileId)) - }) + w.WriteHeader(http.StatusOK) + w.Write([]byte(resp.FileId)) +} + +func createGrpcStream(r *http.Request) (*grpc.ClientConn, pb.BinaryFileService_UploadBinaryFileAsStreamClient, error) { + target := GetGrpcServerUrl(r.Header.Get(GrpcAddressHeader), conf) + conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, nil, fmt.Errorf("gRPC connection error") + } + client := pb.NewBinaryFileServiceClient(conn) + stream, err := client.UploadBinaryFileAsStream(r.Context()) if err != nil { - logger.Fatal("endpoint registration failed: %v", err) + return nil, nil, fmt.Errorf("stream creation error") } + + return conn, stream, nil } -func RegisterGetFileEndpoint(mux *runtime.ServeMux) { - err := mux.HandlePath("GET", "/api/v1/file/content/{fileId}", func(w http.ResponseWriter, r *http.Request, vars map[string]string) { - var fileBuffer bytes.Buffer +func sendFileMetadata(stream pb.BinaryFileService_UploadBinaryFileAsStreamClient, r *http.Request, header *multipart.FileHeader) error { + meta := &pb.GrpcUploadBinaryFileRequest{ + Request: &pb.GrpcUploadBinaryFileRequest_Metadata{ + Metadata: &pb.GrpcUploadBinaryFileMetaData{ + VorgangId: r.FormValue(vorgangIdFormKey), + FileName: header.Filename, + ContentType: header.Header.Get(contentTypeHeaderKey), + }, + }, + } + + if err := stream.Send(meta); err != nil { + return fmt.Errorf("metadata send error") + } - conn, err := grpc.NewClient(GetGrpcServerUrl(r.Header.Get(GrpcAddressHeader), conf), grpc.WithTransportCredentials(insecure.NewCredentials())) + return nil +} + +func sendFileChunks(stream pb.BinaryFileService_UploadBinaryFileAsStreamClient, file multipart.File) error { + buf := make([]byte, fileChunkSizeInByte) + for { + n, err := file.Read(buf) + if err == io.EOF { + break + } if err != nil { - http.Error(w, "gRPC connection error", http.StatusInternalServerError) - return + return fmt.Errorf("file read error") } - defer conn.Close() - client := pb.NewBinaryFileServiceClient(conn) - req := &pb.GrpcGetBinaryFileDataRequest{FileId: vars["fileId"]} - clientStream, err := client.GetBinaryFileContent(r.Context(), req) - if err != nil { - http.Error(w, "stream creation error", http.StatusInternalServerError) - return + req := &pb.GrpcUploadBinaryFileRequest{ + Request: &pb.GrpcUploadBinaryFileRequest_FileContent{ + FileContent: buf[:n], + }, + } + if err = stream.Send(req); err != nil { + return fmt.Errorf("chunk send error") } + } - for { - resp, err := clientStream.Recv() - if err == io.EOF { - break - } - if err != nil { - http.Error(w, "error receiving file chunks", http.StatusInternalServerError) - return - } + return nil +} - _, err = fileBuffer.Write(resp.FileContent) - if err != nil { - http.Error(w, "error writing file data", http.StatusInternalServerError) - return - } - } +func RegisterGetFileEndpoint(mux *runtime.ServeMux) { + err := mux.HandlePath("GET", "/api/v1/file/content/{"+fileIdParam+"}", getFileHandler) - w.Header().Set("Content-Type", "multipart/form-data") - w.Header().Set("Content-Disposition", `attachment; filename="file"`) - w.WriteHeader(http.StatusOK) + if err != nil { + logger.Fatal("failed to register get file endpoint: %v", err) + } +} - mw := multipart.NewWriter(w) - part, err := mw.CreateFormFile("file", "file") - if err != nil { - http.Error(w, "error creating multipart form file", http.StatusInternalServerError) - return - } +func getFileHandler(w http.ResponseWriter, r *http.Request, vars map[string]string) { + fileBuffer, err := fetchFileFromGrpc(r.Context(), vars[fileIdParam], r.Header.Get(GrpcAddressHeader)) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } - _, err = fileBuffer.WriteTo(part) + err = writeMultipartResponse(w, fileBuffer) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +func fetchFileFromGrpc(ctx context.Context, fileId string, grpcAddress string) (*bytes.Buffer, error) { + var fileBuffer bytes.Buffer + + target := GetGrpcServerUrl(grpcAddress, conf) + conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return nil, fmt.Errorf("gRPC connection error") + } + defer conn.Close() + + client := pb.NewBinaryFileServiceClient(conn) + req := &pb.GrpcGetBinaryFileDataRequest{FileId: fileId} + clientStream, err := client.GetBinaryFileContent(ctx, req) + if err != nil { + return nil, fmt.Errorf("stream creation error") + } + + for { + resp, err := clientStream.Recv() + if err == io.EOF { + break + } if err != nil { - http.Error(w, "error writing file data to multipart", http.StatusInternalServerError) - return + return nil, fmt.Errorf("error receiving file chunks") } - err = mw.Close() + _, err = fileBuffer.Write(resp.FileContent) if err != nil { - http.Error(w, "error closing multipart writer", http.StatusInternalServerError) - return + return nil, fmt.Errorf("error writing file data") } - }) + } + + return &fileBuffer, nil +} +func writeMultipartResponse(w http.ResponseWriter, fileBuffer *bytes.Buffer) error { + w.Header().Set(contentTypeHeaderKey, "application/octet-stream") + w.Header().Set(contentDispositionHeaderKey, `attachment; filename="file"`) + w.WriteHeader(http.StatusOK) + + mw := multipart.NewWriter(w) + part, err := mw.CreateFormFile(fileFormKey, fileFormKey) if err != nil { - logger.Fatal("endpoint registration failed: %v", err) + return fmt.Errorf("error creating multipart form file") } + + _, err = fileBuffer.WriteTo(part) + if err != nil { + return fmt.Errorf("error writing file data to multipart") + } + + err = mw.Close() + if err != nil { + return fmt.Errorf("error closing multipart writer") + } + + return nil } func ErrorHandler(ctx context.Context, mux *runtime.ServeMux, marshaler runtime.Marshaler, w http.ResponseWriter, r *http.Request, err error) { diff --git a/internal/server/handler_test.go b/internal/server/handler_test.go index c2e6d3578e4ed10402ae7d18012b6db642cf20d0..8aa40f2fe72fc6a1a22db92dd3f43d6bfdd41f43 100644 --- a/internal/server/handler_test.go +++ b/internal/server/handler_test.go @@ -129,7 +129,6 @@ func TestRegisterBinaryFileEndpoints(t *testing.T) { func TestRegisterFileUploadEndpoint(t *testing.T) { mock.SetUpGrpcServer() - SetUpGrpcRouter() SetUpHttpGateway() var requestBody bytes.Buffer @@ -161,7 +160,6 @@ func TestRegisterFileUploadEndpoint(t *testing.T) { func TestRegisterGetFileEndpoint(t *testing.T) { mock.SetUpGrpcServer() - SetUpGrpcRouter() SetUpHttpGateway() jsonData := []byte(``)