Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
C
common-lib
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
OZG-Cloud
lib
common-lib
Merge requests
!12
Ozg 7573 files weiterleitung bug
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
Ozg 7573 files weiterleitung bug
OZG-7573-files-weiterleitung-bug
into
main
Overview
9
Commits
12
Pipelines
0
Changes
8
2 unresolved threads
Show all comments
Merged
Krzysztof Witukiewicz
requested to merge
OZG-7573-files-weiterleitung-bug
into
main
1 month ago
Overview
9
Commits
12
Pipelines
0
Changes
8
2 unresolved threads
Show all comments
Expand
0
0
Merge request reports
Compare
main
version 4
8f8039e7
1 month ago
version 3
7b7c478d
1 month ago
version 2
a0d677b3
1 month ago
version 1
4d3043cb
1 month ago
main (base)
and
latest version
latest version
bea6456c
12 commits,
1 month ago
version 4
8f8039e7
10 commits,
1 month ago
version 3
7b7c478d
9 commits,
1 month ago
version 2
a0d677b3
5 commits,
1 month ago
version 1
4d3043cb
3 commits,
1 month ago
8 files
+
1071
−
430
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
Files
8
Search (e.g. *.vue) (Ctrl+P)
ozgcloud-common-lib/src/main/java/de/ozgcloud/common/binaryfile/GrpcFileUploadUtils.java
+
33
−
189
Options
@@ -23,225 +23,69 @@
*/
package
de.ozgcloud.common.binaryfile
;
import
java.io.IOException
;
import
java.io.InputStream
;
import
java.util.Objects
;
import
java.util.Optional
;
import
java.util.concurrent.CompletableFuture
;
import
java.util.concurrent.atomic.AtomicBoolean
;
import
java.util.function.BiFunction
;
import
java.util.function.Consumer
;
import
java.util.function.Function
;
import
org.apache.commons.io.IOUtils
;
import
de.ozgcloud.common.errorhandling.TechnicalException
;
import
io.grpc.stub.CallStreamObserver
;
import
io.grpc.stub.StreamObserver
;
import
lombok.AccessLevel
;
import
lombok.Getter
;
import
lombok.NoArgsConstructor
;
import
lombok.NonNull
;
import
lombok.RequiredArgsConstructor
;
import
lombok.extern.log4j.Log4j2
;
@Log4j2
@NoArgsConstructor
(
access
=
AccessLevel
.
PRIVATE
)
public
class
GrpcFileUploadUtils
{
static
final
int
CHUNK_SIZE
=
4
*
1024
;
/*
*
Q = Request Type; S = Response Type
/**
* @param <Q> Request Type
* @param <S> Response Type
*
@deprecated use {@link #createStreamExclusiveFileSender(BiFunction, InputStream, Function)} instead
*/
@Deprecated
(
since
=
"4.13.0"
)
public
static
<
Q
,
S
>
FileSender
<
Q
,
S
>
createSender
(
BiFunction
<
byte
[],
Integer
,
Q
>
chunkBuilder
,
InputStream
inputStream
,
Function
<
StreamObserver
<
S
>,
CallStreamObserver
<
Q
>>
reqObserverBuilder
)
{
return
creat
eSender
(
chunkBuilder
,
inputStream
,
reqObserverBuilder
,
true
);
return
new
Fil
eSender
<>
(
chunkBuilder
,
inputStream
,
reqObserverBuilder
);
}
public
static
<
Q
,
S
>
FileSender
<
Q
,
S
>
createSender
(
BiFunction
<
byte
[],
Integer
,
Q
>
chunkBuilder
,
InputStream
inputStream
,
Function
<
StreamObserver
<
S
>,
CallStreamObserver
<
Q
>>
reqObserverBuilder
,
boolean
completeOnFileSent
)
{
return
new
FileSender
<>(
chunkBuilder
,
reqObserverBuilder
,
inputStream
,
completeOnFileSent
);
/**
* @param <Q> Request Type
* @param <S> Response Type
*/
public
static
<
Q
,
S
>
StreamingFileSender
<
Q
,
S
>
createStreamExclusiveFileSender
(
BiFunction
<
byte
[],
Integer
,
Q
>
chunkBuilder
,
InputStream
inputStream
,
Function
<
StreamObserver
<
S
>,
CallStreamObserver
<
Q
>>
reqObserverBuilder
)
{
return
new
StreamExclusiveFileSender
<>(
chunkBuilder
,
inputStream
,
reqObserverBuilder
);
}
public
static
class
FileSender
<
Q
,
S
>
{
private
final
BiFunction
<
byte
[],
Integer
,
Q
>
chunkBuilder
;
private
final
InputStream
inputStream
;
@Getter
private
final
CompletableFuture
<
S
>
resultFuture
=
new
CompletableFuture
<>();
private
final
Function
<
StreamObserver
<
S
>,
CallStreamObserver
<
Q
>>
reqObserverBuilder
;
private
CallStreamObserver
<
Q
>
requestObserver
;
private
Optional
<
Q
>
metaData
=
Optional
.
empty
();
private
final
AtomicBoolean
metaDataSent
=
new
AtomicBoolean
(
false
);
private
final
AtomicBoolean
done
=
new
AtomicBoolean
(
false
);
private
final
StreamReader
streamReader
;
private
final
boolean
completeOnFileSent
;
FileSender
(
BiFunction
<
byte
[],
Integer
,
Q
>
chunkBuilder
,
Function
<
StreamObserver
<
S
>,
CallStreamObserver
<
Q
>>
reqObserverBuilder
,
InputStream
inputStream
)
{
this
(
chunkBuilder
,
reqObserverBuilder
,
inputStream
,
true
);
}
/**
* @param <Q> Request Type
* @param <S> Response Type
*/
public
static
<
Q
,
S
>
StreamingFileSender
<
Q
,
S
>
createStreamSharingSender
(
BiFunction
<
byte
[],
Integer
,
Q
>
chunkBuilder
,
InputStream
inputStream
,
CallStreamObserver
<
Q
>
requestObserver
,
Consumer
<
Runnable
>
onReadyHandlerRegistrar
)
{
return
new
StreamSharingFileSender
<>(
chunkBuilder
,
inputStream
,
requestObserver
,
onReadyHandlerRegistrar
);
}
FileSender
(
BiFunction
<
byte
[],
Integer
,
Q
>
chunkBuilder
,
Function
<
StreamObserver
<
S
>,
CallStreamObserver
<
Q
>>
reqObserverBuilder
,
InputStream
inputStream
,
boolean
completeOnFileSent
)
{
this
.
chunkBuilder
=
chunkBuilder
;
this
.
inputStream
=
inputStream
;
this
.
reqObserverBuilder
=
reqObserverBuilder
;
this
.
completeOnFileSent
=
completeOnFileSent
;
@Deprecated
(
since
=
"4.13.0"
)
public
static
class
FileSender
<
Q
,
S
>
extends
StreamExclusiveFileSender
<
Q
,
S
>
{
this
.
streamReader
=
new
StreamReader
(
this
.
inputStream
);
}
public
FileSender
<
Q
,
S
>
withMetaData
(
@NonNull
Q
metaData
)
{
this
.
metaData
=
Optional
.
of
(
metaData
);
return
this
;
FileSender
(
BiFunction
<
byte
[],
Integer
,
Q
>
chunkBuilder
,
InputStream
inputStream
,
Function
<
StreamObserver
<
S
>,
CallStreamObserver
<
Q
>>
reqObserverBuilder
)
{
super
(
chunkBuilder
,
inputStream
,
reqObserverBuilder
);
}
@Override
public
FileSender
<
Q
,
S
>
send
()
{
LOG
.
debug
(
"Start sending File."
);
var
responseObserver
=
BinaryFileUploadStreamObserver
.
create
(
resultFuture
,
this
::
sendNext
);
requestObserver
=
reqObserverBuilder
.
apply
(
responseObserver
);
super
.
send
();
return
this
;
}
public
void
cancelOnTimeout
()
{
LOG
.
warn
(
"File transfer canceled on timeout"
);
resultFuture
.
cancel
(
true
);
requestObserver
.
onError
(
new
TechnicalException
(
"Timeout on waiting for upload."
));
closeStreams
();
}
public
void
cancelOnError
(
Throwable
t
)
{
LOG
.
error
(
"File tranfer canceled on error."
,
t
);
resultFuture
.
cancel
(
true
);
requestObserver
.
onError
(
t
);
closeStreams
();
}
void
sendNext
()
{
if
(!
done
.
get
())
{
waitForOberver
();
sendMetaData
();
do
{
LOG
.
debug
(
"Sending next chunk."
);
sendNextChunk
();
}
while
(!
done
.
get
()
&&
isReady
());
LOG
.
debug
(
"Finished or waiting to become ready."
);
}
}
private
boolean
isReady
()
{
return
requestObserver
.
isReady
();
}
private
void
waitForOberver
()
{
synchronized
(
this
)
{
while
(
Objects
.
isNull
(
requestObserver
))
{
try
{
LOG
.
debug
(
"wait for observer"
);
wait
(
300
);
}
catch
(
InterruptedException
e
)
{
LOG
.
error
(
"Error on waiting for request Observer."
,
e
);
Thread
.
currentThread
().
interrupt
();
}
}
}
}
void
sendNextChunk
()
{
byte
[]
contentToSend
=
streamReader
.
getNextData
();
if
(
streamReader
.
getLastReadSize
()
>
0
)
{
sendChunk
(
contentToSend
,
streamReader
.
getLastReadSize
());
}
else
{
endTransfer
();
}
}
private
void
endTransfer
()
{
if
(
completeOnFileSent
)
requestObserver
.
onCompleted
();
else
sendEndOfFile
();
done
.
set
(
true
);
LOG
.
debug
(
"File Transfer done."
);
closeStreams
();
}
private
void
sendEndOfFile
()
{
sendChunk
(
new
byte
[
0
],
streamReader
.
getLastReadSize
());
}
void
closeStreams
()
{
LOG
.
debug
(
"Closing streams"
);
streamReader
.
close
();
}
void
sendChunk
(
byte
[]
content
,
int
length
)
{
LOG
.
debug
(
"Sending {} byte Data."
,
length
);
var
chunk
=
chunkBuilder
.
apply
(
content
,
length
);
requestObserver
.
onNext
(
chunk
);
}
byte
[]
readFromStream
()
{
try
{
return
inputStream
.
readNBytes
(
CHUNK_SIZE
);
}
catch
(
IOException
e
)
{
throw
new
TechnicalException
(
"Error on sending a single chunk"
,
e
);
}
}
void
sendMetaData
()
{
metaData
.
filter
(
md
->
!
metaDataSent
.
get
()).
ifPresent
(
this
::
doSendMetaData
);
}
private
void
doSendMetaData
(
Q
metadata
)
{
LOG
.
debug
(
"Sending Metadata."
);
requestObserver
.
onNext
(
metadata
);
metaDataSent
.
set
(
true
);
}
void
checkForEndOfStream
(
long
sentSize
)
{
if
(
sentSize
<
CHUNK_SIZE
)
{
LOG
.
debug
(
"File Transfer done. Closing stream."
);
IOUtils
.
closeQuietly
(
inputStream
);
requestObserver
.
onCompleted
();
done
.
set
(
true
);
}
else
{
LOG
.
debug
(
"File Transfer not jet done - need to tranfer another chunk."
);
}
}
@RequiredArgsConstructor
private
class
StreamReader
{
private
final
InputStream
inStream
;
private
final
byte
[]
buffer
=
new
byte
[
CHUNK_SIZE
];
@Getter
private
int
lastReadSize
=
0
;
@Getter
private
final
AtomicBoolean
done
=
new
AtomicBoolean
(
false
);
byte
[]
getNextData
()
{
readNext
();
return
buffer
;
}
void
close
()
{
IOUtils
.
closeQuietly
(
inStream
);
}
void
readNext
()
{
try
{
lastReadSize
=
inStream
.
read
(
buffer
,
0
,
CHUNK_SIZE
);
}
catch
(
IOException
e
)
{
throw
new
TechnicalException
(
"Error on reading a single chunk"
,
e
);
}
}
@Override
public
FileSender
<
Q
,
S
>
withMetaData
(
@NonNull
Q
metaData
)
{
super
.
withMetaData
(
metaData
);
return
this
;
}
}
Loading