Newer
Older
/*
* Copyright (C) 2023-2024
* Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package server
import (
"context"
"fmt"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"google.golang.org/grpc/credentials/insecure"
fileFormKey = "file"
vorgangIdFormKey = "vorgangId"
contentTypeHeaderKey = "Content-Type"
maxFileSizeInMB = 150
fileChunkSizeInByte = 1024 * 1024
func RegisterHomeEndpoint(mux *runtime.ServeMux) {
err := mux.HandlePath("GET", "/", func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) {
defer func() {
if err := recover(); err != nil {
logger.Error("failed to recover: %v", err)
http.Error(w, fmt.Sprintf("failed to recover: %v", err), http.StatusInternalServerError)
}
}()
})
if err != nil {
logger.Fatal("failed to register home endpoint: %v", err)
}
}
func RegisterAntragraumEndpoints(ctx context.Context, mux *runtime.ServeMux, grpcUrl string) {
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
err := pb.RegisterAntragraumServiceHandlerFromEndpoint(ctx, mux, grpcUrl, opts)
if err != nil {
logger.Fatal("failed to register antragraum endpoints: %v", err)
}
}
func RegisterCommandEndpoints(ctx context.Context, mux *runtime.ServeMux, grpcUrl string) {
opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())}
err := pb.RegisterCommandServiceHandlerFromEndpoint(ctx, mux, grpcUrl, opts)
if err != nil {
logger.Fatal("failed to register command endpoints: %v", err)
}
}
func RegisterUploadAttachmentEndpoint(mux *runtime.ServeMux) {
err := mux.HandlePath("POST", "/api/v1/file", attachmentUploadHandler)
if err != nil {
logger.Fatal("failed to register file upload endpoint: %v", err)
}
}
func attachmentUploadHandler(w http.ResponseWriter, r *http.Request, _ map[string]string) {
if err := r.ParseMultipartForm(maxFileSizeInMB << 20); err != nil {
http.Error(w, "invalid multipart form", http.StatusBadRequest)
return
}
file, header, err := r.FormFile(fileFormKey)
if err != nil {
http.Error(w, "file retrieval error", http.StatusBadRequest)
return
}
defer file.Close()
client, stream, err := createGrpcStream(r)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
defer client.Close()
if err := sendFileMetadata(stream, r, header); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := sendFileChunks(stream, file); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp, err := stream.CloseAndRecv()
if err != nil {
http.Error(w, "response error", http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte(resp.FileId))
}
func createGrpcStream(r *http.Request) (*grpc.ClientConn, pb.BinaryFileService_UploadBinaryFileAsStreamClient, error) {
target := GetGrpcServerUrl(r.Header.Get(GrpcAddressHeader), conf)
conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, nil, fmt.Errorf("gRPC connection error")
}
client := pb.NewBinaryFileServiceClient(conn)
stream, err := client.UploadBinaryFileAsStream(r.Context())
func sendFileMetadata(stream pb.BinaryFileService_UploadBinaryFileAsStreamClient, r *http.Request, header *multipart.FileHeader) error {
meta := &pb.GrpcUploadBinaryFileRequest{
Request: &pb.GrpcUploadBinaryFileRequest_Metadata{
Metadata: &pb.GrpcUploadBinaryFileMetaData{
VorgangId: r.FormValue(vorgangIdFormKey),
FileName: header.Filename,
ContentType: header.Header.Get(contentTypeHeaderKey),
},
},
}
if err := stream.Send(meta); err != nil {
return fmt.Errorf("metadata send error")
}
return nil
}
func sendFileChunks(stream pb.BinaryFileService_UploadBinaryFileAsStreamClient, file multipart.File) error {
buf := make([]byte, fileChunkSizeInByte)
for {
n, err := file.Read(buf)
if err == io.EOF {
break
}
req := &pb.GrpcUploadBinaryFileRequest{
Request: &pb.GrpcUploadBinaryFileRequest_FileContent{
FileContent: buf[:n],
},
}
if err = stream.Send(req); err != nil {
return fmt.Errorf("chunk send error")
func RegisterGetAttachmentContentEndpoint(mux *runtime.ServeMux) {
err := mux.HandlePath("POST", "/api/v1/file/content", getAttachmentContentHandler)
if err != nil {
logger.Fatal("failed to register get file endpoint: %v", err)
}
}
func getAttachmentContentHandler(w http.ResponseWriter, r *http.Request, _ map[string]string) {
requestPayload, err := getGetAttachmentContentRequestPayload(r)
if err != nil {
http.Error(w, "failed to read request body", http.StatusBadRequest)
}
fileBuffer, err := fetchFileFromGrpc(r.Context(), requestPayload, r.Header.Get(GrpcAddressHeader))
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set(contentTypeHeaderKey, "application/octet-stream")
w.WriteHeader(http.StatusOK)
_, err = fileBuffer.WriteTo(w)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
func getGetAttachmentContentRequestPayload(r *http.Request) (*pb.GrpcGetAttachmentContentRequest, error) {
body, err := io.ReadAll(r.Body)
if err != nil {
return nil, err
}
defer r.Body.Close()
var requestPayload pb.GrpcGetAttachmentContentRequest
if err = protojson.Unmarshal(body, &requestPayload); err != nil {
return nil, err
}
return &requestPayload, nil
}
func fetchFileFromGrpc(ctx context.Context, request *pb.GrpcGetAttachmentContentRequest, grpcAddress string) (*bytes.Buffer, error) {
var fileBuffer bytes.Buffer
target := GetGrpcServerUrl(grpcAddress, conf)
conn, err := grpc.NewClient(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("gRPC connection error")
}
defer conn.Close()
client := pb.NewAntragraumServiceClient(conn)
clientStream, err := client.GetAttachmentContent(ctx, request)
if err != nil {
return nil, fmt.Errorf("stream creation error")
}
for {
resp, err := clientStream.Recv()
if err == io.EOF {
break
}
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
func ErrorHandler(ctx context.Context, mux *runtime.ServeMux, marshaler runtime.Marshaler, w http.ResponseWriter, r *http.Request, err error) {
st, ok := status.FromError(err)
if !ok {
runtime.DefaultHTTPErrorHandler(ctx, mux, marshaler, w, r, err)
return
}
statusCodeMap := map[codes.Code]int{
codes.InvalidArgument: http.StatusBadRequest,
codes.AlreadyExists: http.StatusConflict,
codes.Unavailable: http.StatusServiceUnavailable,
}
httpStatusCode := http.StatusInternalServerError
if code, exists := statusCodeMap[st.Code()]; exists {
httpStatusCode = code
}
w.Header().Set("Content-Type", marshaler.ContentType(r))
w.WriteHeader(httpStatusCode)
_, writeErr := w.Write([]byte(st.Message()))
if writeErr != nil {
logger.Fatal("failed to handle grpc error: %v", writeErr)
}
}