Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
V
vorgang-manager
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
OZG-Cloud
app
vorgang-manager
Merge requests
!27
OZG-7573 Dateien Weiterleiten
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
OZG-7573 Dateien Weiterleiten
OZG-7573-files-weiterleitung-bug
into
main
Overview
13
Commits
18
Pipelines
0
Changes
6
Merged
Krzysztof Witukiewicz
requested to merge
OZG-7573-files-weiterleitung-bug
into
main
1 month ago
Overview
13
Commits
18
Pipelines
0
Changes
6
Expand
0
0
Merge request reports
Compare
main
version 6
3269b04a
1 month ago
version 5
6a11e2d3
1 month ago
version 4
7c77fcf7
1 month ago
version 3
6a0dbd51
1 month ago
version 2
462e5e40
1 month ago
version 1
cb433faf
1 month ago
main (base)
and
latest version
latest version
4c263c57
18 commits,
1 month ago
version 6
3269b04a
17 commits,
1 month ago
version 5
6a11e2d3
16 commits,
1 month ago
version 4
7c77fcf7
15 commits,
1 month ago
version 3
6a0dbd51
14 commits,
1 month ago
version 2
462e5e40
12 commits,
1 month ago
version 1
cb433faf
11 commits,
1 month ago
6 files
+
1551
−
901
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
Some changes are not shown
For a faster browsing experience, some files are collapsed by default.
Expand all files
Files
6
Search (e.g. *.vue) (Ctrl+P)
vorgang-manager-server/src/main/java/de/ozgcloud/vorgang/vorgang/redirect/EingangForwarder.java
0 → 100644
+
299
−
0
Options
/*
* Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
* Ministerpräsidenten des Landes Schleswig-Holstein
* Staatskanzlei
* Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
*
* Lizenziert unter der EUPL, Version 1.2 oder - sobald
* diese von der Europäischen Kommission genehmigt wurden -
* Folgeversionen der EUPL ("Lizenz");
* Sie dürfen dieses Werk ausschließlich gemäß
* dieser Lizenz nutzen.
* Eine Kopie der Lizenz finden Sie hier:
*
* https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
*
* Sofern nicht durch anwendbare Rechtsvorschriften
* gefordert oder in schriftlicher Form vereinbart, wird
* die unter der Lizenz verbreitete Software "so wie sie
* ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
* ausdrücklich oder stillschweigend - verbreitet.
* Die sprachspezifischen Genehmigungen und Beschränkungen
* unter der Lizenz sind dem Lizenztext zu entnehmen.
*/
package
de.ozgcloud.vorgang.vorgang.redirect
;
import
java.io.InputStream
;
import
java.util.List
;
import
java.util.concurrent.CompletableFuture
;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.Future
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
import
java.util.concurrent.atomic.AtomicBoolean
;
import
java.util.concurrent.atomic.AtomicReference
;
import
java.util.function.BiFunction
;
import
org.apache.commons.io.IOUtils
;
import
org.springframework.beans.factory.config.ConfigurableBeanFactory
;
import
org.springframework.context.annotation.Scope
;
import
org.springframework.stereotype.Component
;
import
com.google.protobuf.ByteString
;
import
de.ozgcloud.common.binaryfile.GrpcFileUploadUtils
;
import
de.ozgcloud.common.binaryfile.StreamingFileSender
;
import
de.ozgcloud.common.errorhandling.TechnicalException
;
import
de.ozgcloud.eingang.forwarder.RouteForwardingServiceGrpc
;
import
de.ozgcloud.eingang.forwarding.GrpcAttachment
;
import
de.ozgcloud.eingang.forwarding.GrpcFileContent
;
import
de.ozgcloud.eingang.forwarding.GrpcRepresentation
;
import
de.ozgcloud.eingang.forwarding.GrpcRouteForwarding
;
import
de.ozgcloud.eingang.forwarding.GrpcRouteForwardingRequest
;
import
de.ozgcloud.eingang.forwarding.GrpcRouteForwardingResponse
;
import
de.ozgcloud.vorgang.callcontext.VorgangManagerClientCallContextAttachingInterceptor
;
import
de.ozgcloud.vorgang.files.FileService
;
import
de.ozgcloud.vorgang.vorgang.IncomingFile
;
import
de.ozgcloud.vorgang.vorgang.IncomingFileGroup
;
import
de.ozgcloud.vorgang.vorgang.IncomingFileMapper
;
import
io.grpc.stub.ClientCallStreamObserver
;
import
io.grpc.stub.ClientResponseObserver
;
import
lombok.RequiredArgsConstructor
;
import
lombok.extern.log4j.Log4j2
;
import
net.devh.boot.grpc.client.inject.GrpcClient
;
@Component
@Scope
(
ConfigurableBeanFactory
.
SCOPE_PROTOTYPE
)
@RequiredArgsConstructor
@Log4j2
class
EingangForwarder
{
static
final
int
TIMEOUT_MINUTES
=
2
;
@GrpcClient
(
"forwarder"
)
private
final
RouteForwardingServiceGrpc
.
RouteForwardingServiceStub
serviceStub
;
private
final
FileService
fileService
;
private
final
IncomingFileMapper
incomingFileMapper
;
private
ForwardingResponseObserver
responseObserver
;
private
ClientCallStreamObserver
<
GrpcRouteForwardingRequest
>
requestObserver
;
public
void
forward
(
GrpcRouteForwarding
grpcRouteForwarding
,
List
<
IncomingFileGroup
>
attachments
,
List
<
IncomingFile
>
representations
)
{
var
future
=
performGrpcCall
();
sendEingang
(
grpcRouteForwarding
,
attachments
,
representations
);
requestObserver
.
onCompleted
();
waitForCompletion
(
future
);
}
private
void
sendEingang
(
GrpcRouteForwarding
grpcRouteForwarding
,
List
<
IncomingFileGroup
>
attachments
,
List
<
IncomingFile
>
representations
)
{
sendRouteForwarding
(
grpcRouteForwarding
);
sendAttachments
(
attachments
);
sendRepresentations
(
representations
);
}
Future
<
GrpcRouteForwardingResponse
>
performGrpcCall
()
{
var
responseFuture
=
new
CompletableFuture
<
GrpcRouteForwardingResponse
>();
responseObserver
=
new
ForwardingResponseObserver
(
responseFuture
);
requestObserver
=
(
ClientCallStreamObserver
<
GrpcRouteForwardingRequest
>)
serviceStub
.
withInterceptors
(
new
VorgangManagerClientCallContextAttachingInterceptor
())
.
routeForwarding
(
responseObserver
);
return
responseFuture
;
}
void
sendRouteForwarding
(
GrpcRouteForwarding
grpcRouteForwarding
)
{
var
future
=
new
CompletableFuture
<
Void
>();
responseObserver
.
registerOnReadyHandler
(
getSendRouteForwardingRunnable
(
grpcRouteForwarding
,
future
));
waitForCompletion
(
future
);
}
Runnable
getSendRouteForwardingRunnable
(
GrpcRouteForwarding
grpcRouteForwarding
,
CompletableFuture
<
Void
>
future
)
{
var
executed
=
new
AtomicBoolean
();
return
()
->
{
if
(!
executed
.
compareAndExchange
(
false
,
true
))
{
requestObserver
.
onNext
(
GrpcRouteForwardingRequest
.
newBuilder
().
setRouteForwarding
(
grpcRouteForwarding
).
build
());
future
.
complete
(
null
);
}
};
}
void
sendAttachments
(
List
<
IncomingFileGroup
>
attachments
)
{
attachments
.
stream
()
.
flatMap
(
attachment
->
{
var
groupName
=
attachment
.
getName
();
return
attachment
.
getFiles
().
stream
().
map
(
file
->
new
FileInGroup
(
groupName
,
file
));
})
.
forEach
(
this
::
sendAttachmentFile
);
}
void
sendAttachmentFile
(
FileInGroup
fileInGroup
)
{
var
fileContentStream
=
fileService
.
getUploadedFileStream
(
fileInGroup
.
file
.
getId
());
var
fileSender
=
createAttachmentFileSender
(
fileInGroup
.
groupName
,
fileInGroup
.
file
,
fileContentStream
).
send
();
waitForCompletion
(
fileSender
,
fileContentStream
);
}
record
FileInGroup
(
String
groupName
,
IncomingFile
file
)
{
}
StreamingFileSender
<
GrpcRouteForwardingRequest
,
GrpcRouteForwardingResponse
>
createAttachmentFileSender
(
String
groupName
,
IncomingFile
file
,
InputStream
fileContentStream
)
{
return
createSenderWithoutMetadata
(
this
::
buildAttachmentChunk
,
fileContentStream
).
withMetaData
(
buildGrpcAttachmentFile
(
groupName
,
file
));
}
GrpcRouteForwardingRequest
buildAttachmentChunk
(
byte
[]
chunk
,
int
length
)
{
return
GrpcRouteForwardingRequest
.
newBuilder
()
.
setAttachment
(
GrpcAttachment
.
newBuilder
()
.
setContent
(
buildGrpcFileContent
(
chunk
,
length
))
.
build
())
.
build
();
}
GrpcRouteForwardingRequest
buildGrpcAttachmentFile
(
String
name
,
IncomingFile
file
)
{
return
GrpcRouteForwardingRequest
.
newBuilder
()
.
setAttachment
(
GrpcAttachment
.
newBuilder
()
.
setFile
(
incomingFileMapper
.
toAttachmentFile
(
name
,
file
))
.
build
())
.
build
();
}
void
sendRepresentations
(
List
<
IncomingFile
>
representations
)
{
representations
.
forEach
(
this
::
sendRepresentationFile
);
}
void
sendRepresentationFile
(
IncomingFile
file
)
{
var
fileContentStream
=
fileService
.
getUploadedFileStream
(
file
.
getId
());
var
fileSender
=
createRepresentationFileSender
(
file
,
fileContentStream
).
send
();
waitForCompletion
(
fileSender
,
fileContentStream
);
}
StreamingFileSender
<
GrpcRouteForwardingRequest
,
GrpcRouteForwardingResponse
>
createRepresentationFileSender
(
IncomingFile
file
,
InputStream
fileContentStream
)
{
return
createSenderWithoutMetadata
(
this
::
buildRepresentationChunk
,
fileContentStream
).
withMetaData
(
buildGrpcRepresentationFile
(
file
));
}
GrpcRouteForwardingRequest
buildRepresentationChunk
(
byte
[]
chunk
,
int
length
)
{
return
GrpcRouteForwardingRequest
.
newBuilder
()
.
setRepresentation
(
GrpcRepresentation
.
newBuilder
()
.
setContent
(
buildGrpcFileContent
(
chunk
,
length
))
.
build
())
.
build
();
}
GrpcRouteForwardingRequest
buildGrpcRepresentationFile
(
IncomingFile
file
)
{
return
GrpcRouteForwardingRequest
.
newBuilder
()
.
setRepresentation
(
GrpcRepresentation
.
newBuilder
()
.
setFile
(
incomingFileMapper
.
toRepresentationFile
(
file
))
.
build
())
.
build
();
}
GrpcFileContent
buildGrpcFileContent
(
byte
[]
chunk
,
int
length
)
{
var
fileContentBuilder
=
GrpcFileContent
.
newBuilder
();
if
(
length
<=
0
)
{
fileContentBuilder
.
setIsEndOfFile
(
true
);
}
else
{
fileContentBuilder
.
setContent
(
ByteString
.
copyFrom
(
chunk
));
}
return
fileContentBuilder
.
build
();
}
StreamingFileSender
<
GrpcRouteForwardingRequest
,
GrpcRouteForwardingResponse
>
createSenderWithoutMetadata
(
BiFunction
<
byte
[],
Integer
,
GrpcRouteForwardingRequest
>
chunkBuilder
,
InputStream
fileContentStream
)
{
return
GrpcFileUploadUtils
.
createStreamSharingSender
(
chunkBuilder
,
fileContentStream
,
requestObserver
,
responseObserver:
:
registerOnReadyHandler
);
}
<
T
>
void
waitForCompletion
(
Future
<
T
>
responseFuture
)
{
try
{
responseFuture
.
get
(
TIMEOUT_MINUTES
,
TimeUnit
.
MINUTES
);
}
catch
(
InterruptedException
e
)
{
Thread
.
currentThread
().
interrupt
();
throw
new
TechnicalException
(
"Waiting for finishing file upload was interrupted."
,
e
);
}
catch
(
ExecutionException
e
)
{
throw
new
TechnicalException
(
"Error on uploading file content."
,
e
);
}
catch
(
TimeoutException
e
)
{
throw
new
TechnicalException
(
"Timeout on uploading file content."
,
e
);
}
}
void
waitForCompletion
(
StreamingFileSender
<
GrpcRouteForwardingRequest
,
GrpcRouteForwardingResponse
>
fileSender
,
InputStream
fileContentStream
)
{
try
{
fileSender
.
getResultFuture
().
get
(
TIMEOUT_MINUTES
,
TimeUnit
.
MINUTES
);
}
catch
(
InterruptedException
e
)
{
Thread
.
currentThread
().
interrupt
();
fileSender
.
cancelOnError
(
e
);
throw
new
TechnicalException
(
"Waiting for finishing upload was interrupted."
,
e
);
}
catch
(
ExecutionException
e
)
{
fileSender
.
cancelOnError
(
e
);
throw
new
TechnicalException
(
"Error on uploading file content."
,
e
);
}
catch
(
TimeoutException
e
)
{
fileSender
.
cancelOnTimeout
();
throw
new
TechnicalException
(
"Timeout on uploading data."
,
e
);
}
finally
{
IOUtils
.
closeQuietly
(
fileContentStream
);
}
}
@RequiredArgsConstructor
static
class
ForwardingResponseObserver
implements
ClientResponseObserver
<
GrpcRouteForwardingRequest
,
GrpcRouteForwardingResponse
>
{
private
final
CompletableFuture
<
GrpcRouteForwardingResponse
>
future
;
private
DelegatingOnReadyHandler
onReadyHandler
;
private
GrpcRouteForwardingResponse
response
;
@Override
public
void
beforeStart
(
ClientCallStreamObserver
<
GrpcRouteForwardingRequest
>
requestStream
)
{
onReadyHandler
=
new
DelegatingOnReadyHandler
(
requestStream
);
requestStream
.
setOnReadyHandler
(
onReadyHandler
);
}
@Override
public
void
onNext
(
GrpcRouteForwardingResponse
response
)
{
this
.
response
=
response
;
}
@Override
public
void
onError
(
Throwable
t
)
{
onReadyHandler
.
stop
();
future
.
completeExceptionally
(
t
);
}
@Override
public
void
onCompleted
()
{
onReadyHandler
.
stop
();
future
.
complete
(
response
);
}
public
void
registerOnReadyHandler
(
Runnable
onReadyHandler
)
{
this
.
onReadyHandler
.
setDelegate
(
onReadyHandler
);
}
}
@RequiredArgsConstructor
static
class
DelegatingOnReadyHandler
implements
Runnable
{
private
final
ClientCallStreamObserver
<
GrpcRouteForwardingRequest
>
requestStream
;
private
final
AtomicReference
<
Runnable
>
delegateRef
=
new
AtomicReference
<>();
private
final
AtomicBoolean
done
=
new
AtomicBoolean
(
false
);
public
void
setDelegate
(
Runnable
onReadyHandler
)
{
this
.
delegateRef
.
set
(
onReadyHandler
);
}
public
void
stop
()
{
done
.
set
(
true
);
}
@Override
public
void
run
()
{
while
(!
done
.
get
()
&&
requestStream
.
isReady
())
{
if
(
Thread
.
currentThread
().
isInterrupted
())
{
break
;
}
var
delegate
=
delegateRef
.
get
();
if
(
delegate
!=
null
)
{
delegate
.
run
();
}
}
}
}
}
Loading