Newer
Older
/*
* Copyright (C) 2023-2024
* Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package server
import (
"context"
"fmt"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"google.golang.org/grpc/credentials/insecure"
fileFormKey = "file"
vorgangIdFormKey = "vorgangId"
contentTypeHeaderKey = "Content-Type"
contentDispositionHeaderKey = "Content-Disposition"
maxFileSizeInMB = 150
fileChunkSizeInByte = 1024 * 1024
func RegisterHomeEndpoint(mux *runtime.ServeMux) {
err := mux.HandlePath("GET", "/", func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) {
defer func() {
if err := recover(); err != nil {
logger.Error("failed to recover: %v", err)
http.Error(w, fmt.Sprintf("failed to recover: %v", err), http.StatusInternalServerError)
}
}()
})
if err != nil {
logger.Fatal("failed to register home endpoint: %v", err)
}
}
func RegisterAntragraumEndpoints(ctx context.Context, mux *runtime.ServeMux, grpcUrl string) {
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
err := pb.RegisterAntragraumServiceHandlerFromEndpoint(ctx, mux, grpcUrl, opts)
if err != nil {
logger.Fatal("failed to register antragraum endpoints: %v", err)
}
}
func RegisterCommandEndpoints(ctx context.Context, mux *runtime.ServeMux, grpcUrl string) {
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
err := pb.RegisterCommandServiceHandlerFromEndpoint(ctx, mux, grpcUrl, opts)
if err != nil {
logger.Fatal("failed to register command endpoints: %v", err)
}
}
func RegisterBinaryFileEndpoints(ctx context.Context, mux *runtime.ServeMux, grpcUrl string) {
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
err := pb.RegisterBinaryFileServiceHandlerFromEndpoint(ctx, mux, grpcUrl, opts)
if err != nil {
logger.Fatal("failed to register binary file endpoints: %v", err)
}
}
func RegisterFileUploadEndpoint(mux *runtime.ServeMux) {
err := mux.HandlePath("POST", "/api/v1/file", fileUploadHandler)
if err != nil {
logger.Fatal("failed to register file upload endpoint: %v", err)
}
}
func fileUploadHandler(w http.ResponseWriter, r *http.Request, _ map[string]string) {
if err := r.ParseMultipartForm(maxFileSizeInMB << 20); err != nil {
http.Error(w, "invalid multipart form", http.StatusBadRequest)
return
}
file, header, err := r.FormFile(fileFormKey)
if err != nil {
http.Error(w, "file retrieval error", http.StatusBadRequest)
return
}
defer file.Close()
client, stream, err := createGrpcStream(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer client.Close()
if err := sendFileMetadata(stream, r, header); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := sendFileChunks(stream, file); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp, err := stream.CloseAndRecv()
if err != nil {
http.Error(w, "response error", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(resp.FileId))
}
func createGrpcStream(r *http.Request) (*grpc.ClientConn, pb.BinaryFileService_UploadBinaryFileAsStreamClient, error) {
target := GetGrpcServerUrl(r.Header.Get(GrpcAddressHeader), conf)
conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, nil, fmt.Errorf("gRPC connection error")
}
client := pb.NewBinaryFileServiceClient(conn)
stream, err := client.UploadBinaryFileAsStream(r.Context())
func sendFileMetadata(stream pb.BinaryFileService_UploadBinaryFileAsStreamClient, r *http.Request, header *multipart.FileHeader) error {
meta := &pb.GrpcUploadBinaryFileRequest{
Request: &pb.GrpcUploadBinaryFileRequest_Metadata{
Metadata: &pb.GrpcUploadBinaryFileMetaData{
VorgangId: r.FormValue(vorgangIdFormKey),
FileName: header.Filename,
ContentType: header.Header.Get(contentTypeHeaderKey),
},
},
}
if err := stream.Send(meta); err != nil {
return fmt.Errorf("metadata send error")
}
return nil
}
func sendFileChunks(stream pb.BinaryFileService_UploadBinaryFileAsStreamClient, file multipart.File) error {
buf := make([]byte, fileChunkSizeInByte)
for {
n, err := file.Read(buf)
if err == io.EOF {
break
}
req := &pb.GrpcUploadBinaryFileRequest{
Request: &pb.GrpcUploadBinaryFileRequest_FileContent{
FileContent: buf[:n],
},
}
if err = stream.Send(req); err != nil {
return fmt.Errorf("chunk send error")
err := mux.HandlePath("GET", "/api/v1/file/content/{"+fileIdParam+"}", getFileHandler)
if err != nil {
logger.Fatal("failed to register get file endpoint: %v", err)
}
}
func getFileHandler(w http.ResponseWriter, r *http.Request, vars map[string]string) {
fileBuffer, err := fetchFileFromGrpc(r.Context(), vars[fileIdParam], r.Header.Get(GrpcAddressHeader))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = writeMultipartResponse(w, fileBuffer)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
func fetchFileFromGrpc(ctx context.Context, fileId string, grpcAddress string) (*bytes.Buffer, error) {
var fileBuffer bytes.Buffer
target := GetGrpcServerUrl(grpcAddress, conf)
conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("gRPC connection error")
}
defer conn.Close()
client := pb.NewBinaryFileServiceClient(conn)
req := &pb.GrpcGetBinaryFileDataRequest{FileId: fileId}
clientStream, err := client.GetBinaryFileContent(ctx, req)
if err != nil {
return nil, fmt.Errorf("stream creation error")
}
for {
resp, err := clientStream.Recv()
if err == io.EOF {
break
}
func writeMultipartResponse(w http.ResponseWriter, fileBuffer *bytes.Buffer) error {
w.Header().Set(contentTypeHeaderKey, "application/octet-stream")
w.Header().Set(contentDispositionHeaderKey, `attachment; filename="file"`)
w.WriteHeader(http.StatusOK)
mw := multipart.NewWriter(w)
part, err := mw.CreateFormFile(fileFormKey, fileFormKey)
_, err = fileBuffer.WriteTo(part)
if err != nil {
return fmt.Errorf("error writing file data to multipart")
}
err = mw.Close()
if err != nil {
return fmt.Errorf("error closing multipart writer")
}
return nil
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
func ErrorHandler(ctx context.Context, mux *runtime.ServeMux, marshaler runtime.Marshaler, w http.ResponseWriter, r *http.Request, err error) {
st, ok := status.FromError(err)
if !ok {
runtime.DefaultHTTPErrorHandler(ctx, mux, marshaler, w, r, err)
return
}
statusCodeMap := map[codes.Code]int{
codes.InvalidArgument: http.StatusBadRequest,
codes.AlreadyExists: http.StatusConflict,
codes.Unavailable: http.StatusServiceUnavailable,
}
httpStatusCode := http.StatusInternalServerError
if code, exists := statusCodeMap[st.Code()]; exists {
httpStatusCode = code
}
w.Header().Set("Content-Type", marshaler.ContentType(r))
w.WriteHeader(httpStatusCode)
_, writeErr := w.Write([]byte(st.Message()))
if writeErr != nil {
logger.Fatal("failed to handle grpc error: %v", writeErr)
}
}