Skip to content
Snippets Groups Projects
Commit 9abbd562 authored by Felix Reichenbach's avatar Felix Reichenbach
Browse files

OZG-7573 refactor EingangStubReceiverStreamObserver

parent 7dc12961
No related branches found
No related tags found
1 merge request!9Ozg 7573 forward vorgang
......@@ -72,8 +72,7 @@ class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRouteForwa
private final Map<String, List<IncomingFile>> attachments = new HashMap<>();
private String groupName;
private ContentCollector attachmentCollector;
private ContentCollector representationCollector;
private ContentCollector contentCollector;
@Override
public synchronized void onNext(GrpcRouteForwardingRequest request) {
......@@ -97,24 +96,28 @@ class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRouteForwa
void handleAttachment(GrpcAttachment attachment) {
if (attachment.hasFile()) {
attachmentCollector = buildContentCollector(incomingFileMapper.fromGrpcAttachmentFile(attachment.getFile()));
verifyNoOngoingFileDownload();
contentCollector = buildContentCollector(incomingFileMapper.fromGrpcAttachmentFile(attachment.getFile()));
groupName = attachment.getFile().getGroupName();
} else {
if (Objects.isNull(attachmentCollector)) {
throw new IllegalStateException("File content received before metadata.");
}
attachmentCollector.collect(attachment.getContent()).ifPresent(this::addAsAttachment);
verifyMetadataWasSent();
contentCollector.collect(attachment.getContent()).ifPresent(this::addAsAttachment);
}
}
void handleRepresentation(GrpcRepresentation representation) {
if (representation.hasFile()) {
representationCollector = buildContentCollector(incomingFileMapper.fromGrpcRepresentationFile(representation.getFile()));
verifyNoOngoingFileDownload();
contentCollector = buildContentCollector(incomingFileMapper.fromGrpcRepresentationFile(representation.getFile()));
} else {
if (Objects.isNull(representationCollector)) {
throw new IllegalStateException("File content received before metadata.");
verifyMetadataWasSent();
contentCollector.collect(representation.getContent()).ifPresent(this::addAsRepresentation);
}
representationCollector.collect(representation.getContent()).ifPresent(this::addAsRepresentation);
}
void verifyNoOngoingFileDownload() {
if (Objects.nonNull(contentCollector)) {
throw new IllegalStateException("File metadata received before previous file content was completed.");
}
}
......@@ -125,21 +128,26 @@ class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRouteForwa
.build();
}
void verifyMetadataWasSent() {
if (Objects.isNull(contentCollector)) {
throw new IllegalStateException("File content received before metadata.");
}
}
void addAsAttachment(IncomingFile completedIncomingFile) {
attachments.computeIfAbsent(groupName, s -> new ArrayList<>()).add(completedIncomingFile);
attachmentCollector = null;
contentCollector = null;
}
void addAsRepresentation(IncomingFile completedIncomingFile) {
representations.add(completedIncomingFile);
representationCollector = null;
contentCollector = null;
}
@Override
public synchronized void onError(Throwable t) {
LOG.error("Error happened. Receiving stream closed.", t);
attachmentCollector.close();
representationCollector.close();
contentCollector.close();
}
@Override
......
......@@ -199,12 +199,7 @@ class EingangStubReceiverStreamObserverTest {
class TestHandleAttachment {
@Mock
private ContentCollector attachmentCollector;
@BeforeEach
void setUp() {
setAttachmentCollector(attachmentCollector);
}
private ContentCollector contentCollector;
@Nested
class TestWithFile {
......@@ -213,8 +208,16 @@ class EingangStubReceiverStreamObserverTest {
private final IncomingFile incomingFile = IncomingFileTestFactory.create();
@BeforeEach
void mock() {
void initMock() {
when(incomingFileMapper.fromGrpcAttachmentFile(any())).thenReturn(incomingFile);
doNothing().when(observer).verifyNoOngoingFileDownload();
}
@Test
void shouldCallVerifyNoOngoingFileDownload() {
observer.handleAttachment(attachmentWithFile);
verify(observer).verifyNoOngoingFileDownload();
}
@Test
......@@ -225,12 +228,12 @@ class EingangStubReceiverStreamObserverTest {
}
@Test
void shouldSetAttachmentCollector() {
void shouldSetContentCollector() {
var expectedContentCollector = ContentCollector.builder().fileSaver(fileSaver).incomingFile(incomingFile).build();
observer.handleAttachment(attachmentWithFile);
assertThat(getAttachmentCollector()).usingRecursiveComparison().ignoringFields("pipedInput", "pipedOutput")
assertThat(getContentCollector()).usingRecursiveComparison().ignoringFields("pipedInput", "pipedOutput")
.isEqualTo(expectedContentCollector);
}
......@@ -245,7 +248,7 @@ class EingangStubReceiverStreamObserverTest {
void shouldNotCollectContent() {
observer.handleAttachment(attachmentWithFile);
verify(attachmentCollector, never()).collect(any());
verify(contentCollector, never()).collect(any());
}
}
......@@ -255,11 +258,23 @@ class EingangStubReceiverStreamObserverTest {
private final GrpcAttachment attachmentWithContent = GrpcAttachmentTestFactory.createWithContent();
private final IncomingFile incomingFile = IncomingFileTestFactory.create();
@BeforeEach
void setUp() {
setContentCollector(contentCollector);
}
@Test
void shouldCallVerifyMetadataWasSent() {
observer.handleAttachment(attachmentWithContent);
verify(observer).verifyMetadataWasSent();
}
@Test
void shouldCollectContent() {
observer.handleAttachment(attachmentWithContent);
verify(attachmentCollector).collect(GrpcAttachmentTestFactory.CONTENT);
verify(contentCollector).collect(GrpcAttachmentTestFactory.CONTENT);
}
@Test
......@@ -278,7 +293,7 @@ class EingangStubReceiverStreamObserverTest {
@Test
void shouldCallAddAsAttachment() {
when(attachmentCollector.collect(any())).thenReturn(Optional.of(incomingFile));
when(contentCollector.collect(any())).thenReturn(Optional.of(incomingFile));
observer.handleAttachment(attachmentWithContent);
......@@ -287,12 +302,13 @@ class EingangStubReceiverStreamObserverTest {
@Test
void shouldNotCallAddAsAttachment() {
when(attachmentCollector.collect(any())).thenReturn(Optional.empty());
when(contentCollector.collect(any())).thenReturn(Optional.empty());
observer.handleAttachment(attachmentWithContent);
verify(observer, never()).addAsAttachment(any());
}
}
}
......@@ -300,12 +316,7 @@ class EingangStubReceiverStreamObserverTest {
class TestHandleRepresentation {
@Mock
private ContentCollector representationCollector;
@BeforeEach
void setUp() {
setRepresentationCollector(representationCollector);
}
private ContentCollector contentCollector;
@Nested
class TestWithFile {
......@@ -316,6 +327,14 @@ class EingangStubReceiverStreamObserverTest {
@BeforeEach
void mock() {
when(incomingFileMapper.fromGrpcRepresentationFile(any())).thenReturn(incomingFile);
doNothing().when(observer).verifyNoOngoingFileDownload();
}
@Test
void shouldCallVerifyNoOngoingFileDownload() {
observer.handleRepresentation(representationWithFile);
verify(observer).verifyNoOngoingFileDownload();
}
@Test
......@@ -326,12 +345,12 @@ class EingangStubReceiverStreamObserverTest {
}
@Test
void shouldSetRepresentationCollector() {
void shouldSetContentCollector() {
var expectedContentCollector = ContentCollector.builder().fileSaver(fileSaver).incomingFile(incomingFile).build();
observer.handleRepresentation(representationWithFile);
assertThat(getRepresentationCollector()).usingRecursiveComparison().ignoringFields("pipedInput", "pipedOutput")
assertThat(getContentCollector()).usingRecursiveComparison().ignoringFields("pipedInput", "pipedOutput")
.isEqualTo(expectedContentCollector);
}
......@@ -339,7 +358,7 @@ class EingangStubReceiverStreamObserverTest {
void shouldNotCollectContent() {
observer.handleRepresentation(representationWithFile);
verify(representationCollector, never()).collect(any());
verify(contentCollector, never()).collect(any());
}
}
......@@ -349,11 +368,23 @@ class EingangStubReceiverStreamObserverTest {
private final GrpcRepresentation representationWithContent = GrpcRepresentationTestFactory.createWithContent();
private final IncomingFile incomingFile = IncomingFileTestFactory.create();
@BeforeEach
void setUp() {
setContentCollector(contentCollector);
}
@Test
void shouldCallVerifyMetadataWasSent() {
observer.handleRepresentation(representationWithContent);
verify(observer).verifyMetadataWasSent();
}
@Test
void shouldCollectContent() {
observer.handleRepresentation(representationWithContent);
verify(representationCollector).collect(GrpcRepresentationTestFactory.CONTENT);
verify(contentCollector).collect(GrpcRepresentationTestFactory.CONTENT);
}
@Test
......@@ -365,7 +396,7 @@ class EingangStubReceiverStreamObserverTest {
@Test
void shouldCallAddAsRepresentation() {
when(representationCollector.collect(any())).thenReturn(Optional.of(incomingFile));
when(contentCollector.collect(any())).thenReturn(Optional.of(incomingFile));
observer.handleRepresentation(representationWithContent);
......@@ -374,7 +405,7 @@ class EingangStubReceiverStreamObserverTest {
@Test
void shouldNotCallAddAsRepresentation() {
when(representationCollector.collect(any())).thenReturn(Optional.empty());
when(contentCollector.collect(any())).thenReturn(Optional.empty());
observer.handleRepresentation(representationWithContent);
......@@ -383,17 +414,49 @@ class EingangStubReceiverStreamObserverTest {
}
}
@Nested
class TestVerifyNoOngoingFileDownload {
@Test
void shouldThrowIllegalStateExceptionIfContentCollectorIsNotNull() {
setContentCollector(mock(ContentCollector.class));
assertThrows(IllegalStateException.class, () -> observer.verifyNoOngoingFileDownload());
}
@Test
void shouldNotThrowExceptionIfContentCollectorIsNull() {
assertDoesNotThrow(() -> observer.verifyNoOngoingFileDownload());
}
}
@Nested
class TestVerifyMetadataWasSent {
@Test
void shouldThrowIllegalStateExceptionIfContentCollectorIsNull() {
assertThrows(IllegalStateException.class, () -> observer.verifyMetadataWasSent());
}
@Test
void shouldNotThrowExceptionIfContentCollectorIsNotNull() {
setContentCollector(mock(ContentCollector.class));
assertDoesNotThrow(() -> observer.verifyMetadataWasSent());
}
}
@Nested
class TestAddAsAttachment {
@Mock
private ContentCollector attachmentCollector;
private ContentCollector contentCollector;
private final IncomingFile incomingFile = IncomingFileTestFactory.create();
@BeforeEach
void setUp() {
setRepresentationCollector(attachmentCollector);
setContentCollector(contentCollector);
setGroupName(GrpcAttachmentFileTestFactory.GROUP_NAME);
}
......@@ -406,10 +469,10 @@ class EingangStubReceiverStreamObserverTest {
}
@Test
void shouldSetAttachmentCollectorToNull() {
void shouldSetContentCollectorToNull() {
observer.addAsAttachment(incomingFile);
assertThat(getAttachmentCollector()).isNull();
assertThat(getContentCollector()).isNull();
}
}
......@@ -417,13 +480,13 @@ class EingangStubReceiverStreamObserverTest {
class TestAddAsRepresentation {
@Mock
private ContentCollector representationCollector;
private ContentCollector contentCollector;
private final IncomingFile incomingFile = IncomingFileTestFactory.create();
@BeforeEach
void setUp() {
setRepresentationCollector(representationCollector);
setContentCollector(contentCollector);
}
@Test
......@@ -434,10 +497,10 @@ class EingangStubReceiverStreamObserverTest {
}
@Test
void shouldSetRepresentationCollectorToNull() {
void shouldSetContentCollectorToNull() {
observer.addAsRepresentation(incomingFile);
assertThat(getRepresentationCollector()).isNull();
assertThat(getContentCollector()).isNull();
}
}
......@@ -445,28 +508,19 @@ class EingangStubReceiverStreamObserverTest {
class TestOnError {
@Mock
private ContentCollector attachmentCollector;
@Mock
private ContentCollector representationCollector;
private ContentCollector contentCollector;
@BeforeEach
void mock() {
setAttachmentCollector(attachmentCollector);
setRepresentationCollector(representationCollector);
setContentCollector(contentCollector);
setContentCollector(contentCollector);
}
@Test
void shouldCloseAttachmentCollector() {
void shouldCloseContentCollector() {
observer.onError(new Exception());
verify(attachmentCollector).close();
}
@Test
void shouldCloseRepresentationCollector() {
observer.onError(new Exception());
verify(representationCollector).close();
verify(contentCollector).close();
}
}
......@@ -591,20 +645,12 @@ class EingangStubReceiverStreamObserverTest {
ReflectionTestUtils.setField(observer, "attachments", attachments);
}
private ContentCollector getAttachmentCollector() {
return (ContentCollector) ReflectionTestUtils.getField(observer, "attachmentCollector");
}
private void setAttachmentCollector(ContentCollector attachmentCollector) {
ReflectionTestUtils.setField(observer, "attachmentCollector", attachmentCollector);
}
private ContentCollector getRepresentationCollector() {
return (ContentCollector) ReflectionTestUtils.getField(observer, "representationCollector");
private ContentCollector getContentCollector() {
return (ContentCollector) ReflectionTestUtils.getField(observer, "contentCollector");
}
private void setRepresentationCollector(ContentCollector representationCollector) {
ReflectionTestUtils.setField(observer, "representationCollector", representationCollector);
private void setContentCollector(ContentCollector contentCollector) {
ReflectionTestUtils.setField(observer, "contentCollector", contentCollector);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment