From 7dc12961d47a0816ce9eca12437aafbb85b4a0ab Mon Sep 17 00:00:00 2001
From: Felix Reichenbach <felix.reichenbach@mgm-tp.com>
Date: Fri, 21 Mar 2025 08:26:38 +0100
Subject: [PATCH] OZG-7573 refactor EingangStubReceiverStreamObserver and
 create ContentCollector

---
 .../formdata/IncomingFileTestFactory.java     |   4 +-
 .../eingang/forwarder/ContentCollector.java   | 112 +++
 .../EingangStubReceiverStreamObserver.java    | 125 +---
 .../forwarder/ContentCollectorTest.java       | 484 +++++++++++++
 ...EingangStubReceiverStreamObserverTest.java | 657 +++---------------
 5 files changed, 735 insertions(+), 647 deletions(-)
 create mode 100644 forwarder/src/main/java/de/ozgcloud/eingang/forwarder/ContentCollector.java
 create mode 100644 forwarder/src/test/java/de/ozgcloud/eingang/forwarder/ContentCollectorTest.java

diff --git a/common/src/test/java/de/ozgcloud/eingang/common/formdata/IncomingFileTestFactory.java b/common/src/test/java/de/ozgcloud/eingang/common/formdata/IncomingFileTestFactory.java
index f377a61cd..85a108436 100644
--- a/common/src/test/java/de/ozgcloud/eingang/common/formdata/IncomingFileTestFactory.java
+++ b/common/src/test/java/de/ozgcloud/eingang/common/formdata/IncomingFileTestFactory.java
@@ -23,6 +23,7 @@
  */
 package de.ozgcloud.eingang.common.formdata;
 
+import java.io.File;
 import java.util.UUID;
 
 import org.springframework.http.MediaType;
@@ -41,6 +42,7 @@ public class IncomingFileTestFactory {
 	public static final String PDF_CONTENT_TYPE = MediaType.APPLICATION_PDF_VALUE;
 	public static final String JSON_CONTENT_TYPE = MediaType.APPLICATION_JSON_VALUE;
 	public static final byte[] CONTENT = "TESTCONTENT1".getBytes();
+	public static final File FILE = TempFileUtils.writeTmpFile(CONTENT);
 	public static final long SIZE = 12;
 
 	public static IncomingFile create() {
@@ -53,7 +55,7 @@ public class IncomingFileTestFactory {
 				.vendorId(VENDOR_ID)
 				.name(NAME)
 				.contentType(CONTENT_TYPE)
-				.file(TempFileUtils.writeTmpFile(CONTENT))
+				.file(FILE)
 				.size(SIZE);
 	}
 
diff --git a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/ContentCollector.java b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/ContentCollector.java
new file mode 100644
index 000000000..5099f2385
--- /dev/null
+++ b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/ContentCollector.java
@@ -0,0 +1,112 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.eingang.forwarder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+import org.apache.commons.io.IOUtils;
+
+import de.ozgcloud.common.errorhandling.TechnicalException;
+import de.ozgcloud.eingang.common.formdata.IncomingFile;
+import de.ozgcloud.eingang.forwarding.GrpcFileContent;
+import lombok.Builder;
+import lombok.extern.log4j.Log4j2;
+
+@Log4j2
+class ContentCollector {
+	private static final int CHUNK_SIZE = 1024 * 64;
+	private static final long TIMEOUT_MINUTES = 10;
+
+	private final IncomingFile incomingFile;
+
+	private final PipedOutputStream pipedOutput;
+	private final PipedInputStream pipedInput;
+	private final CompletableFuture<File> receivingFileContent;
+
+	@Builder
+	private ContentCollector(Function<InputStream, CompletableFuture<File>> fileSaver, IncomingFile incomingFile) {
+		this.incomingFile = incomingFile;
+		try {
+			pipedInput = new PipedInputStream(CHUNK_SIZE);
+			pipedOutput = new PipedOutputStream(pipedInput);
+			receivingFileContent = fileSaver.apply(pipedInput);
+		} catch (IOException e) {
+			throw new TechnicalException("Upload initialization failed", e);
+		}
+	}
+
+	public Optional<IncomingFile> collect(GrpcFileContent fileContent) {
+		if (fileContent.getIsEndOfFile()) {
+			return Optional.of(handleEndOfFile());
+		}
+		try {
+			pipedOutput.write(fileContent.getContent().toByteArray());
+		} catch (IOException e) {
+			throw new TechnicalException("Error when writing file content.", e);
+		}
+		return Optional.empty();
+	}
+
+	IncomingFile handleEndOfFile() {
+		closeOutputPipe();
+		return incomingFile.toBuilder().file(getSavedFileContent()).build();
+	}
+
+	File getSavedFileContent() {
+		try {
+			return receivingFileContent.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
+		} catch (ExecutionException | TimeoutException e) {
+			throw new TechnicalException("Receiving file failed.", e);
+		} catch (InterruptedException e) {
+			Thread.currentThread().interrupt();
+			throw new TechnicalException("Upload was interrupted.", e);
+		} finally {
+			closeInputPipe();
+		}
+	}
+
+	public void close() {
+		closeOutputPipe();
+		closeInputPipe();
+	}
+
+	void closeOutputPipe() {
+		IOUtils.closeQuietly(pipedOutput, e -> LOG.error("Cannot close output stream.", e));
+	}
+
+	void closeInputPipe() {
+		IOUtils.closeQuietly(pipedInput, e -> LOG.error("Cannot close input stream.", e));
+	}
+
+}
diff --git a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java
index ec9080a75..45f253538 100644
--- a/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java
+++ b/forwarder/src/main/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserver.java
@@ -24,29 +24,19 @@
 package de.ozgcloud.eingang.forwarder;
 
 import java.io.File;
-import java.io.IOException;
 import java.io.InputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
-import org.apache.commons.io.IOUtils;
-
-import de.ozgcloud.common.errorhandling.TechnicalException;
 import de.ozgcloud.eingang.common.formdata.FormData;
 import de.ozgcloud.eingang.common.formdata.IncomingFile;
 import de.ozgcloud.eingang.forwarding.GrpcAttachment;
-import de.ozgcloud.eingang.forwarding.GrpcFileContent;
 import de.ozgcloud.eingang.forwarding.GrpcRepresentation;
 import de.ozgcloud.eingang.forwarding.GrpcRouteForwarding;
 import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingRequest;
@@ -58,8 +48,6 @@ import lombok.extern.log4j.Log4j2;
 @Log4j2
 class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRouteForwardingRequest> {
 
-	private static final int CHUNK_SIZE = 1024 * 64;
-	private static final long TIMEOUT_MINUTES = 10;
 	private final RouteForwardingMapper routeForwardingMapper;
 	private final IncomingFileMapper incomingFileMapper;
 	private final IncomingFileGroupMapper incomingFileGroupMapper;
@@ -83,11 +71,9 @@ class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRouteForwa
 	private final List<IncomingFile> representations = new ArrayList<>();
 	private final Map<String, List<IncomingFile>> attachments = new HashMap<>();
 
-	private IncomingFile currentFile;
 	private String groupName;
-	private PipedOutputStream pipedOutput;
-	private PipedInputStream pipedInput;
-	private CompletableFuture<File> receivingFileContent;
+	private ContentCollector attachmentCollector;
+	private ContentCollector representationCollector;
 
 	@Override
 	public synchronized void onNext(GrpcRouteForwardingRequest request) {
@@ -111,114 +97,49 @@ class EingangStubReceiverStreamObserver implements StreamObserver<GrpcRouteForwa
 
 	void handleAttachment(GrpcAttachment attachment) {
 		if (attachment.hasFile()) {
-			setCurrentMetadata(incomingFileMapper.fromGrpcAttachmentFile(attachment.getFile()));
+			attachmentCollector = buildContentCollector(incomingFileMapper.fromGrpcAttachmentFile(attachment.getFile()));
 			groupName = attachment.getFile().getGroupName();
 		} else {
-			handleFileContent(attachment.getContent());
+			if (Objects.isNull(attachmentCollector)) {
+				throw new IllegalStateException("File content received before metadata.");
+			}
+			attachmentCollector.collect(attachment.getContent()).ifPresent(this::addAsAttachment);
 		}
 	}
 
 	void handleRepresentation(GrpcRepresentation representation) {
 		if (representation.hasFile()) {
-			setCurrentMetadata(incomingFileMapper.fromGrpcRepresentationFile(representation.getFile()));
+			representationCollector = buildContentCollector(incomingFileMapper.fromGrpcRepresentationFile(representation.getFile()));
 		} else {
-			handleFileContent(representation.getContent());
-		}
-
-	}
-
-	void setCurrentMetadata(IncomingFile metaData) {
-		if (Objects.nonNull(currentFile)) {
-			throw new IllegalStateException("Received additional file before previous file reached the end.");
-		}
-		currentFile = metaData;
-	}
-
-	void handleFileContent(GrpcFileContent fileContent) {
-		if (Objects.isNull(receivingFileContent)) {
-			initContentReceiving();
-		}
-		storeFileContent(fileContent);
-	}
-
-	void initContentReceiving() {
-		try {
-			pipedInput = new PipedInputStream(CHUNK_SIZE);
-			pipedOutput = new PipedOutputStream(pipedInput);
-			receivingFileContent = fileSaver.apply(pipedInput);
-		} catch (IOException e) {
-			throw new TechnicalException("Upload initialization failed", e);
-		}
-	}
-
-	void storeFileContent(GrpcFileContent content) {
-		if (Objects.isNull(currentFile)) {
-			throw new IllegalStateException("File content received before metadata.");
-		}
-		if (content.getIsEndOfFile()) {
-			handleEndOfFile();
-		} else {
-			try {
-				pipedOutput.write(content.getContent().toByteArray());
-			} catch (IOException e) {
-				throw new TechnicalException("Error when writing file content.", e);
+			if (Objects.isNull(representationCollector)) {
+				throw new IllegalStateException("File content received before metadata.");
 			}
+			representationCollector.collect(representation.getContent()).ifPresent(this::addAsRepresentation);
 		}
 	}
 
-	void handleEndOfFile() {
-		closeOutputPipe();
-		var completedIncomingFile = currentFile.toBuilder().file(getSavedFileContent()).build();
-		if (Objects.isNull(groupName)) {
-			addAsRepresentation(completedIncomingFile);
-		} else {
-			addAsAttachment(completedIncomingFile);
-		}
-		resetFileReceiving();
-	}
-
-	private void addAsRepresentation(IncomingFile completedIncomingFile) {
-		representations.add(completedIncomingFile);
+	private ContentCollector buildContentCollector(IncomingFile incomingFile) {
+		return ContentCollector.builder()
+				.fileSaver(fileSaver)
+				.incomingFile(incomingFile)
+				.build();
 	}
 
-	private void addAsAttachment(IncomingFile completedIncomingFile) {
+	void addAsAttachment(IncomingFile completedIncomingFile) {
 		attachments.computeIfAbsent(groupName, s -> new ArrayList<>()).add(completedIncomingFile);
+		attachmentCollector = null;
 	}
 
-	File getSavedFileContent() {
-		try {
-			return receivingFileContent.get(TIMEOUT_MINUTES, TimeUnit.MINUTES);
-		} catch (ExecutionException | TimeoutException e) {
-			throw new TechnicalException("Receiving file failed.", e);
-		} catch (InterruptedException e) {
-			Thread.currentThread().interrupt();
-			throw new TechnicalException("Upload was interrupted.", e);
-		} finally {
-			closeInputPipe();
-		}
-	}
-
-	void resetFileReceiving() {
-		currentFile = null;
-		groupName = null;
-		pipedOutput = null;
-		pipedInput = null;
-		receivingFileContent = null;
+	void addAsRepresentation(IncomingFile completedIncomingFile) {
+		representations.add(completedIncomingFile);
+		representationCollector = null;
 	}
 
 	@Override
 	public synchronized void onError(Throwable t) {
 		LOG.error("Error happened. Receiving stream closed.", t);
-		closeOutputPipe();
-		closeInputPipe();
-	}
-
-	void closeOutputPipe() {
-		IOUtils.closeQuietly(pipedOutput, e -> LOG.error("Cannot close output stream.", e));
-	}
-
-	void closeInputPipe() {
-		IOUtils.closeQuietly(pipedInput, e -> LOG.error("Cannot close input stream.", e));
+		attachmentCollector.close();
+		representationCollector.close();
 	}
 
 	@Override
diff --git a/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/ContentCollectorTest.java b/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/ContentCollectorTest.java
new file mode 100644
index 000000000..4385eb7ef
--- /dev/null
+++ b/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/ContentCollectorTest.java
@@ -0,0 +1,484 @@
+/*
+ * Copyright (C) 2025 Das Land Schleswig-Holstein vertreten durch den
+ * Ministerpräsidenten des Landes Schleswig-Holstein
+ * Staatskanzlei
+ * Abteilung Digitalisierung und zentrales IT-Management der Landesregierung
+ *
+ * Lizenziert unter der EUPL, Version 1.2 oder - sobald
+ * diese von der Europäischen Kommission genehmigt wurden -
+ * Folgeversionen der EUPL ("Lizenz");
+ * Sie dürfen dieses Werk ausschließlich gemäß
+ * dieser Lizenz nutzen.
+ * Eine Kopie der Lizenz finden Sie hier:
+ *
+ * https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
+ *
+ * Sofern nicht durch anwendbare Rechtsvorschriften
+ * gefordert oder in schriftlicher Form vereinbart, wird
+ * die unter der Lizenz verbreitete Software "so wie sie
+ * ist", OHNE JEGLICHE GEWÄHRLEISTUNG ODER BEDINGUNGEN -
+ * ausdrücklich oder stillschweigend - verbreitet.
+ * Die sprachspezifischen Genehmigungen und Beschränkungen
+ * unter der Lizenz sind dem Lizenztext zu entnehmen.
+ */
+package de.ozgcloud.eingang.forwarder;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import de.ozgcloud.common.errorhandling.TechnicalException;
+import de.ozgcloud.eingang.common.formdata.IncomingFile;
+import de.ozgcloud.eingang.common.formdata.IncomingFileTestFactory;
+import de.ozgcloud.eingang.forwarding.GrpcFileContent;
+import lombok.SneakyThrows;
+
+class ContentCollectorTest {
+
+	private ContentCollector contentCollector;
+	@Mock
+	private Function<InputStream, CompletableFuture<File>> fileSaver;
+	@Mock
+	private CompletableFuture<File> fileContentFuture;
+	private final IncomingFile incomingFile = IncomingFileTestFactory.createBuilder().file(null).build();
+
+	void setUpContentCollector() {
+		when(fileSaver.apply(any())).thenReturn(fileContentFuture);
+		contentCollector = spy(ContentCollector.builder().fileSaver(fileSaver).incomingFile(incomingFile).build());
+	}
+
+	@Nested
+	class TestBuilding {
+
+		private final byte[] content = new byte[] { 1, 2, 3 };
+
+		@Test
+		void shouldSetIncomingFile() {
+			setUpContentCollector();
+
+			assertThat(getIncomingFile()).isSameAs(incomingFile);
+		}
+
+		@Test
+		void shouldCreateInputStream() {
+			setUpContentCollector();
+
+			assertThat(getPipedInput()).isNotNull();
+		}
+
+		@Test
+		void shouldCreateOutputStream() {
+			setUpContentCollector();
+
+			assertThat(getPipedOutput()).isNotNull();
+		}
+
+		@Test
+		void shouldCreateConnectedStreams() {
+			setUpContentCollector();
+
+			verifyStreamSetUp();
+		}
+
+		@SneakyThrows
+		private void verifyStreamSetUp() {
+			var pipedInput = getPipedInput();
+			var pipedOutput = getPipedOutput();
+			pipedOutput.write(content);
+			pipedOutput.close();
+			var readBytes = pipedInput.readAllBytes();
+			assertThat(readBytes).isEqualTo(content);
+		}
+
+		@Test
+		void shouldCallFileSaver() {
+			setUpContentCollector();
+
+			verify(fileSaver).apply(getPipedInput());
+		}
+
+		@Test
+		void shouldSetReceivingFileContent() {
+			setUpContentCollector();
+
+			assertThat(getFileContent()).isSameAs(fileContentFuture);
+		}
+	}
+
+	@Nested
+	class TestCollect {
+
+		@Mock
+		private PipedOutputStream pipedOutput;
+
+		@BeforeEach
+		void setUp() {
+			setUpContentCollector();
+			setPipedOutput(pipedOutput);
+		}
+
+		@Nested
+		class TestOnEndOfFile {
+
+			private final GrpcFileContent fileContent = GrpcFileContentTestFactory.createBuilder().setIsEndOfFile(true).build();
+			private final IncomingFile completedIncomingFile = IncomingFileTestFactory.create();
+
+			@BeforeEach
+			void setUp() {
+				doReturn(completedIncomingFile).when(contentCollector).handleEndOfFile();
+			}
+
+			@Test
+			void shouldCallHandleEndOfFile() {
+				contentCollector.collect(fileContent);
+
+				verify(contentCollector).handleEndOfFile();
+			}
+
+			@Test
+			@SneakyThrows
+			void shouldNotWriteContentToOutputStream() {
+				contentCollector.collect(fileContent);
+
+				verify(pipedOutput, never()).write(any());
+			}
+
+			@Test
+			void shouldReturnCompletedIncomingFile() {
+				var result = contentCollector.collect(fileContent);
+
+				assertThat(result).contains(completedIncomingFile);
+			}
+		}
+
+		@Nested
+		class TestOnNotEndOfFile {
+
+			private final GrpcFileContent fileContent = GrpcFileContentTestFactory.createBuilder().setIsEndOfFile(false).build();
+
+			@Test
+			@SneakyThrows
+			void shouldWriteContentToOutputStream() {
+				contentCollector.collect(fileContent);
+
+				verify(pipedOutput).write(GrpcFileContentTestFactory.CONTENT);
+			}
+
+			@Test
+			void shouldNotCallHandleEndOfFile() {
+				contentCollector.collect(fileContent);
+
+				verify(contentCollector, never()).handleEndOfFile();
+			}
+
+			@Test
+			void shouldReturnEmpty() {
+				var result = contentCollector.collect(fileContent);
+
+				assertThat(result).isEmpty();
+			}
+
+			@Test
+			@SneakyThrows
+			void shouldThrowTechnicalExceptionOnIOException() {
+				doThrow(new IOException()).when(pipedOutput).write(any());
+
+				assertThrows(TechnicalException.class, () -> {
+					contentCollector.collect(fileContent);
+				});
+			}
+		}
+	}
+
+	@Nested
+	class TestHandleEndOfFile {
+
+		@BeforeEach
+		void setUp() {
+			setUpContentCollector();
+			doNothing().when(contentCollector).closeOutputPipe();
+			doReturn(IncomingFileTestFactory.FILE).when(contentCollector).getSavedFileContent();
+		}
+
+		@Test
+		void shouldCallCloseOutputPipe() {
+			contentCollector.handleEndOfFile();
+
+			verify(contentCollector).closeOutputPipe();
+		}
+
+		@Test
+		void shouldCallGetSavedFileContent() {
+			contentCollector.handleEndOfFile();
+
+			verify(contentCollector).getSavedFileContent();
+		}
+
+		@Test
+		void shouldReturnIncomingFileWithSavedFile() {
+			var returnedIncomingFile = contentCollector.handleEndOfFile();
+
+			assertThat(returnedIncomingFile).usingRecursiveComparison().isEqualTo(IncomingFileTestFactory.create());
+		}
+	}
+
+	@Nested
+	class TestGetSavedFileContent {
+		@BeforeEach
+		void setUp() {
+			setUpContentCollector();
+			doNothing().when(contentCollector).closeInputPipe();
+		}
+
+		@Nested
+		class TestOnNoExceptions {
+			@Mock
+			private File fileContent;
+
+			@BeforeEach
+			@SneakyThrows
+			void setUp() {
+				when(fileContentFuture.get(anyLong(), any())).thenReturn(fileContent);
+			}
+
+			@Test
+			void shouldReturnFile() {
+				var savedFileContent = contentCollector.getSavedFileContent();
+
+				assertThat(savedFileContent).isSameAs(fileContent);
+			}
+
+			@Test
+			void shouldCallCloseInputPipe() {
+				contentCollector.getSavedFileContent();
+
+				verify(contentCollector).closeInputPipe();
+			}
+		}
+
+		@Nested
+		class TestOnExecutionException {
+
+			@BeforeEach
+			void setUp() {
+				setFileContent(CompletableFuture.failedFuture(new Exception()));
+			}
+
+			@Test
+			void shouldThrowTechnicalException() {
+				assertThrows(TechnicalException.class, () -> contentCollector.getSavedFileContent());
+			}
+
+			@Test
+			void shouldCallCloseInputPipe() {
+				try {
+					contentCollector.getSavedFileContent();
+				} catch (TechnicalException e) {
+					// expected
+				}
+
+				verify(contentCollector).closeInputPipe();
+			}
+		}
+
+		@Nested
+		class TestOnTimeoutException {
+
+			@Mock
+			private CompletableFuture<File> fileFuture;
+
+			@BeforeEach
+			@SneakyThrows
+			void setUp() {
+				setFileContent(fileFuture);
+				when(fileFuture.get(anyLong(), any())).thenThrow(new TimeoutException());
+			}
+
+			@Test
+			void shouldThrowTechnicalException() {
+				assertThrows(TechnicalException.class, () -> contentCollector.getSavedFileContent());
+			}
+
+			@Test
+			void shouldCallCloseInputPipe() {
+				try {
+					contentCollector.getSavedFileContent();
+				} catch (TechnicalException e) {
+					// expected
+				}
+
+				verify(contentCollector).closeInputPipe();
+			}
+		}
+
+		@Nested
+		class TestOnInterruptedException {
+
+			@Mock
+			private CompletableFuture<File> fileFuture;
+
+			@BeforeEach
+			@SneakyThrows
+			void setUp() {
+				setFileContent(fileFuture);
+				when(fileFuture.get(anyLong(), any())).thenThrow(new InterruptedException());
+			}
+
+			@Test
+			void shouldThrowTechnicalException() {
+				assertThrows(TechnicalException.class, () -> contentCollector.getSavedFileContent());
+			}
+
+			@Test
+			void shouldInterruptCurrentThread() {
+				try {
+					contentCollector.getSavedFileContent();
+				} catch (TechnicalException e) {
+					// expected
+				}
+
+				assertThat(Thread.currentThread().isInterrupted()).isTrue();
+			}
+
+			@Test
+			void shouldCallCloseInputPipe() {
+				try {
+					contentCollector.getSavedFileContent();
+				} catch (TechnicalException e) {
+					// expected
+				}
+
+				verify(contentCollector).closeInputPipe();
+			}
+		}
+	}
+
+	@Nested
+	class TestClose {
+
+		@BeforeEach
+		void setUp() {
+			setUpContentCollector();
+			doNothing().when(contentCollector).closeOutputPipe();
+			doNothing().when(contentCollector).closeInputPipe();
+		}
+
+		@Test
+		void shouldCallCloseOutputPipe() {
+			contentCollector.close();
+
+			verify(contentCollector).closeOutputPipe();
+		}
+
+		@Test
+		void shouldCallCloseInputPipe() {
+			contentCollector.close();
+
+			verify(contentCollector).closeInputPipe();
+		}
+	}
+
+	@Nested
+	class TestCloseOutputPipe {
+
+		@Mock
+		private PipedOutputStream pipedOutput;
+
+		@BeforeEach
+		void setUp() {
+			setUpContentCollector();
+			setPipedOutput(pipedOutput);
+		}
+
+		@Test
+		@SneakyThrows
+		void shouldClosePipedOutput() {
+			contentCollector.closeOutputPipe();
+
+			verify(pipedOutput).close();
+		}
+
+		@Test
+		@SneakyThrows
+		void shouldNotThrowException() {
+			doThrow(IOException.class).when(pipedOutput).close();
+
+			assertDoesNotThrow(() -> contentCollector.closeOutputPipe());
+		}
+	}
+
+	@Nested
+	class TestCloseInputPipe {
+
+		@Mock
+		private PipedInputStream pipedInput;
+
+		@BeforeEach
+		void setUp() {
+			setUpContentCollector();
+			setPipedInput(pipedInput);
+		}
+
+		@Test
+		@SneakyThrows
+		void shouldClosePipedInput() {
+			contentCollector.closeInputPipe();
+
+			verify(pipedInput).close();
+		}
+
+		@Test
+		@SneakyThrows
+		void shouldNotThrowException() {
+			doThrow(IOException.class).when(pipedInput).close();
+
+			assertDoesNotThrow(() -> contentCollector.closeInputPipe());
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	private CompletableFuture<File> getFileContent() {
+		return (CompletableFuture<File>) ReflectionTestUtils.getField(contentCollector, "receivingFileContent");
+	}
+
+	private void setFileContent(CompletableFuture<File> fileContent) {
+		ReflectionTestUtils.setField(contentCollector, "receivingFileContent", fileContent);
+	}
+
+	private PipedInputStream getPipedInput() {
+		return (PipedInputStream) ReflectionTestUtils.getField(contentCollector, "pipedInput");
+	}
+
+	private void setPipedInput(PipedInputStream pipedInput) {
+		ReflectionTestUtils.setField(contentCollector, "pipedInput", pipedInput);
+	}
+
+	private PipedOutputStream getPipedOutput() {
+		return (PipedOutputStream) ReflectionTestUtils.getField(contentCollector, "pipedOutput");
+	}
+
+	private void setPipedOutput(PipedOutputStream pipedOutput) {
+		ReflectionTestUtils.setField(contentCollector, "pipedOutput", pipedOutput);
+	}
+
+	private IncomingFile getIncomingFile() {
+		return (IncomingFile) ReflectionTestUtils.getField(contentCollector, "incomingFile");
+	}
+
+}
diff --git a/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserverTest.java b/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserverTest.java
index 04d21376b..f312ec86f 100644
--- a/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserverTest.java
+++ b/forwarder/src/test/java/de/ozgcloud/eingang/forwarder/EingangStubReceiverStreamObserverTest.java
@@ -29,14 +29,11 @@ import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.*;
 
 import java.io.File;
-import java.io.IOException;
 import java.io.InputStream;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeoutException;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -46,18 +43,15 @@ import org.junit.jupiter.api.Test;
 import org.mockito.Mock;
 import org.springframework.test.util.ReflectionTestUtils;
 
-import de.ozgcloud.common.errorhandling.TechnicalException;
 import de.ozgcloud.eingang.common.formdata.FormData;
 import de.ozgcloud.eingang.common.formdata.FormDataTestFactory;
 import de.ozgcloud.eingang.common.formdata.IncomingFile;
 import de.ozgcloud.eingang.common.formdata.IncomingFileGroupTestFactory;
 import de.ozgcloud.eingang.common.formdata.IncomingFileTestFactory;
 import de.ozgcloud.eingang.forwarding.GrpcAttachment;
-import de.ozgcloud.eingang.forwarding.GrpcFileContent;
 import de.ozgcloud.eingang.forwarding.GrpcRepresentation;
 import de.ozgcloud.eingang.forwarding.GrpcRouteForwarding;
 import de.ozgcloud.eingang.forwarding.GrpcRouteForwardingResponse;
-import lombok.SneakyThrows;
 
 class EingangStubReceiverStreamObserverTest {
 
@@ -204,6 +198,14 @@ class EingangStubReceiverStreamObserverTest {
 	@Nested
 	class TestHandleAttachment {
 
+		@Mock
+		private ContentCollector attachmentCollector;
+
+		@BeforeEach
+		void setUp() {
+			setAttachmentCollector(attachmentCollector);
+		}
+
 		@Nested
 		class TestWithFile {
 
@@ -212,7 +214,6 @@ class EingangStubReceiverStreamObserverTest {
 
 			@BeforeEach
 			void mock() {
-				doNothing().when(observer).setCurrentMetadata(any());
 				when(incomingFileMapper.fromGrpcAttachmentFile(any())).thenReturn(incomingFile);
 			}
 
@@ -224,10 +225,13 @@ class EingangStubReceiverStreamObserverTest {
 			}
 
 			@Test
-			void shouldCallSetCurrentMetadata() {
+			void shouldSetAttachmentCollector() {
+				var expectedContentCollector = ContentCollector.builder().fileSaver(fileSaver).incomingFile(incomingFile).build();
+
 				observer.handleAttachment(attachmentWithFile);
 
-				verify(observer).setCurrentMetadata(incomingFile);
+				assertThat(getAttachmentCollector()).usingRecursiveComparison().ignoringFields("pipedInput", "pipedOutput")
+						.isEqualTo(expectedContentCollector);
 			}
 
 			@Test
@@ -238,10 +242,10 @@ class EingangStubReceiverStreamObserverTest {
 			}
 
 			@Test
-			void shouldNotCallHandleFileContent() {
+			void shouldNotCollectContent() {
 				observer.handleAttachment(attachmentWithFile);
 
-				verify(observer, never()).handleFileContent(any());
+				verify(attachmentCollector, never()).collect(any());
 			}
 		}
 
@@ -249,17 +253,13 @@ class EingangStubReceiverStreamObserverTest {
 		class TestWithContent {
 
 			private final GrpcAttachment attachmentWithContent = GrpcAttachmentTestFactory.createWithContent();
-
-			@BeforeEach
-			void mock() {
-				doNothing().when(observer).handleFileContent(any());
-			}
+			private final IncomingFile incomingFile = IncomingFileTestFactory.create();
 
 			@Test
-			void shouldCallHandleFileContent() {
+			void shouldCollectContent() {
 				observer.handleAttachment(attachmentWithContent);
 
-				verify(observer).handleFileContent(GrpcAttachmentTestFactory.CONTENT);
+				verify(attachmentCollector).collect(GrpcAttachmentTestFactory.CONTENT);
 			}
 
 			@Test
@@ -275,12 +275,38 @@ class EingangStubReceiverStreamObserverTest {
 
 				assertThat(getGroupName()).isNull();
 			}
+
+			@Test
+			void shouldCallAddAsAttachment() {
+				when(attachmentCollector.collect(any())).thenReturn(Optional.of(incomingFile));
+
+				observer.handleAttachment(attachmentWithContent);
+
+				verify(observer).addAsAttachment(incomingFile);
+			}
+
+			@Test
+			void shouldNotCallAddAsAttachment() {
+				when(attachmentCollector.collect(any())).thenReturn(Optional.empty());
+
+				observer.handleAttachment(attachmentWithContent);
+
+				verify(observer, never()).addAsAttachment(any());
+			}
 		}
 	}
 
 	@Nested
 	class TestHandleRepresentation {
 
+		@Mock
+		private ContentCollector representationCollector;
+
+		@BeforeEach
+		void setUp() {
+			setRepresentationCollector(representationCollector);
+		}
+
 		@Nested
 		class TestWithFile {
 
@@ -289,7 +315,6 @@ class EingangStubReceiverStreamObserverTest {
 
 			@BeforeEach
 			void mock() {
-				doNothing().when(observer).setCurrentMetadata(any());
 				when(incomingFileMapper.fromGrpcRepresentationFile(any())).thenReturn(incomingFile);
 			}
 
@@ -301,17 +326,20 @@ class EingangStubReceiverStreamObserverTest {
 			}
 
 			@Test
-			void shouldCallSetCurrentMetadata() {
+			void shouldSetRepresentationCollector() {
+				var expectedContentCollector = ContentCollector.builder().fileSaver(fileSaver).incomingFile(incomingFile).build();
+
 				observer.handleRepresentation(representationWithFile);
 
-				verify(observer).setCurrentMetadata(incomingFile);
+				assertThat(getRepresentationCollector()).usingRecursiveComparison().ignoringFields("pipedInput", "pipedOutput")
+						.isEqualTo(expectedContentCollector);
 			}
 
 			@Test
-			void shouldNotCallHandleFileContent() {
+			void shouldNotCollectContent() {
 				observer.handleRepresentation(representationWithFile);
 
-				verify(observer, never()).handleFileContent(any());
+				verify(representationCollector, never()).collect(any());
 			}
 		}
 
@@ -319,17 +347,13 @@ class EingangStubReceiverStreamObserverTest {
 		class TestWithContent {
 
 			private final GrpcRepresentation representationWithContent = GrpcRepresentationTestFactory.createWithContent();
-
-			@BeforeEach
-			void mock() {
-				doNothing().when(observer).handleFileContent(any());
-			}
+			private final IncomingFile incomingFile = IncomingFileTestFactory.create();
 
 			@Test
-			void shouldCallHandleFileContent() {
+			void shouldCollectContent() {
 				observer.handleRepresentation(representationWithContent);
 
-				verify(observer).handleFileContent(GrpcRepresentationTestFactory.CONTENT);
+				verify(representationCollector).collect(GrpcRepresentationTestFactory.CONTENT);
 			}
 
 			@Test
@@ -338,549 +362,111 @@ class EingangStubReceiverStreamObserverTest {
 
 				verify(incomingFileMapper, never()).fromGrpcRepresentationFile(any());
 			}
-		}
-	}
-
-	@Nested
-	class TestSetCurrentMetadata {
-
-		private final IncomingFile incomingFile = IncomingFileTestFactory.create();
-
-		@Test
-		void shouldThrowIllegalStateExceptionIfCurrentFileIsSet() {
-			setCurrentFile(incomingFile);
-
-			assertThrows(IllegalStateException.class, () -> observer.setCurrentMetadata(incomingFile));
-		}
-
-		@Test
-		void shouldSetCurrentFile() {
-			observer.setCurrentMetadata(incomingFile);
-
-			assertThat(getCurrentFile()).isSameAs(incomingFile);
-		}
-	}
-
-	@Nested
-	class TestHandleFileContent {
-
-		private final GrpcFileContent fileContent = GrpcFileContentTestFactory.create();
-
-		@Nested
-		class TestOnReceivingFileContentIsNull {
-			@BeforeEach
-			void mock() {
-				doNothing().when(observer).initContentReceiving();
-				doNothing().when(observer).storeFileContent(any());
-			}
-
-			@Test
-			void shouldCallInitContentReceiving() {
-				observer.handleFileContent(fileContent);
-
-				verify(observer).initContentReceiving();
-			}
 
 			@Test
-			void shouldCallStoreFileContent() {
-				observer.handleFileContent(fileContent);
-
-				verify(observer).storeFileContent(fileContent);
-			}
-		}
-
-		@Nested
-		class TestOnReceivingFileContentIsNotNull {
-			@Mock
-			private CompletableFuture<File> receivingFileContent;
+			void shouldCallAddAsRepresentation() {
+				when(representationCollector.collect(any())).thenReturn(Optional.of(incomingFile));
 
-			@BeforeEach
-			void mock() {
-				doNothing().when(observer).storeFileContent(any());
-				setFileContent(receivingFileContent);
-			}
-
-			@Test
-			void shouldNotCallInitContentReceiving() {
-				observer.handleFileContent(fileContent);
-
-				verify(observer, never()).initContentReceiving();
-			}
-
-			@Test
-			void shouldCallStoreFileContent() {
-				observer.handleFileContent(fileContent);
+				observer.handleRepresentation(representationWithContent);
 
-				verify(observer).storeFileContent(fileContent);
+				verify(observer).addAsRepresentation(incomingFile);
 			}
-		}
-	}
-
-	@Nested
-	class TestInitContentReceiving {
-
-		private final byte[] content = new byte[] { 1, 2, 3 };
-
-		@Test
-		void shouldCreateInputStream() {
-			observer.initContentReceiving();
-
-			assertThat(getPipedInput()).isNotNull();
-		}
-
-		@Test
-		void shouldCreateOutputStream() {
-			observer.initContentReceiving();
-
-			assertThat(getPipedOutput()).isNotNull();
-		}
-
-		@Test
-		void shouldCreateConnectedStreams() {
-			observer.initContentReceiving();
-
-			verifyStreamSetUp();
-		}
-
-		@SneakyThrows
-		private void verifyStreamSetUp() {
-			var pipedInput = getPipedInput();
-			var pipedOutput = getPipedOutput();
-			pipedOutput.write(content);
-			pipedOutput.close();
-			var readBytes = pipedInput.readAllBytes();
-			assertThat(readBytes).isEqualTo(content);
-		}
-
-		@Test
-		void shouldCallFileSaver() {
-			observer.initContentReceiving();
-
-			verify(fileSaver).apply(getPipedInput());
-		}
-
-		@Test
-		void shouldSetReceivingFileContent() {
-			var fileFuture = CompletableFuture.completedFuture(mock(File.class));
-			when(fileSaver.apply(any())).thenReturn(fileFuture);
-
-			observer.initContentReceiving();
-
-			assertThat(getFileContent()).isSameAs(fileFuture);
-		}
-	}
-
-	@Nested
-	class TestStoreFileContent {
-		@Mock
-		private PipedOutputStream pipedOutput;
-
-		@BeforeEach
-		void setUp() {
-			setPipedOutput(pipedOutput);
-		}
-
-		@Nested
-		class TestOnCurrentFileIsNull {
 
 			@Test
-			void shouldThrowTechnicalException() {
-				var fileContent = GrpcFileContentTestFactory.create();
+			void shouldNotCallAddAsRepresentation() {
+				when(representationCollector.collect(any())).thenReturn(Optional.empty());
 
-				assertThrows(IllegalStateException.class, () -> observer.storeFileContent(fileContent));
-			}
-		}
-
-		@Nested
-		class TestOnCurrentFileIsNotNull {
-
-			private final IncomingFile incomingFile = IncomingFileTestFactory.create();
-
-			@BeforeEach
-			void mock() {
-				setCurrentFile(incomingFile);
-			}
-
-			@Nested
-			class TestOnEndOfFile {
-
-				private GrpcFileContent fileContent = GrpcFileContentTestFactory.createBuilder().setIsEndOfFile(true).build();
-
-				@BeforeEach
-				void setUp() {
-					doNothing().when(observer).handleEndOfFile();
-				}
-
-				@Test
-				void shouldCallHandleEndOfFile() {
-					observer.storeFileContent(fileContent);
-
-					verify(observer).handleEndOfFile();
-				}
-
-				@Test
-				@SneakyThrows
-				void shouldNotWriteContentToOutputStream() {
-					observer.storeFileContent(fileContent);
-
-					verify(pipedOutput, never()).write(any());
-				}
-			}
-
-			@Nested
-			class TestOnNotEndOfFile {
-
-				private GrpcFileContent fileContent = GrpcFileContentTestFactory.createBuilder().setIsEndOfFile(false).build();
-
-				@Test
-				@SneakyThrows
-				void shouldWriteContentToOutputStream() {
-					observer.storeFileContent(fileContent);
-
-					verify(pipedOutput).write(GrpcFileContentTestFactory.CONTENT);
-				}
-
-				@Test
-				void shouldNotCallHandleEndOfFile() {
-					observer.storeFileContent(fileContent);
-
-					verify(observer, never()).handleEndOfFile();
-				}
-
-				@Test
-				@SneakyThrows
-				void shouldThrowTechnicalExceptionOnIOException() {
-					doThrow(new IOException()).when(pipedOutput).write(any());
+				observer.handleRepresentation(representationWithContent);
 
-					assertThrows(TechnicalException.class, () -> {
-						observer.storeFileContent(fileContent);
-					});
-				}
+				verify(observer, never()).addAsRepresentation(any());
 			}
 		}
 	}
 
 	@Nested
-	class TestHandleEndOfFile {
+	class TestAddAsAttachment {
 
 		@Mock
-		private File savedFileContent;
+		private ContentCollector attachmentCollector;
 
-		private final IncomingFile incomingFile = IncomingFileTestFactory.createBuilder().file(null).build();
+		private final IncomingFile incomingFile = IncomingFileTestFactory.create();
 
 		@BeforeEach
 		void setUp() {
-			doNothing().when(observer).closeOutputPipe();
-			doReturn(savedFileContent).when(observer).getSavedFileContent();
-			setCurrentFile(incomingFile);
+			setRepresentationCollector(attachmentCollector);
+			setGroupName(GrpcAttachmentFileTestFactory.GROUP_NAME);
 		}
 
 		@Test
-		void shouldCallCloseOutputPipe() {
-			observer.handleEndOfFile();
-
-			verify(observer).closeOutputPipe();
-		}
-
-		@Nested
-		class TestOnGroupNameNull {
-
-			@BeforeEach
-			void setUp() {
-				setGroupName(null);
-			}
-
-			@Test
-			void shouldAddFileToRepresentations() {
-				var expectedIncomingFile = IncomingFileTestFactory.createBuilder().file(savedFileContent).build();
+		void shouldAddFileToAttachments() {
+			observer.addAsAttachment(incomingFile);
 
-				observer.handleEndOfFile();
-
-				assertThat(getRepresentations()).usingRecursiveFieldByFieldElementComparator().containsExactly(expectedIncomingFile);
-			}
-		}
-
-		@Nested
-		class TestOnGroupNameSet {
-
-			@BeforeEach
-			void setUp() {
-				setGroupName(GrpcAttachmentFileTestFactory.GROUP_NAME);
-			}
-
-			@Test
-			void shouldAddFileToAttachments() {
-				var expectedIncomingFile = IncomingFileTestFactory.createBuilder().file(savedFileContent).build();
-
-				observer.handleEndOfFile();
-
-				var attachmentGroup = getAttachments().get(GrpcAttachmentFileTestFactory.GROUP_NAME);
-				assertThat(attachmentGroup).usingRecursiveFieldByFieldElementComparator().containsExactly(expectedIncomingFile);
-			}
+			var attachmentGroup = getAttachments().get(GrpcAttachmentFileTestFactory.GROUP_NAME);
+			assertThat(attachmentGroup).usingRecursiveFieldByFieldElementComparator().containsExactly(incomingFile);
 		}
 
 		@Test
-		void shouldCallResetFileReceiving() {
-			observer.handleEndOfFile();
+		void shouldSetAttachmentCollectorToNull() {
+			observer.addAsAttachment(incomingFile);
 
-			verify(observer).resetFileReceiving();
+			assertThat(getAttachmentCollector()).isNull();
 		}
 	}
 
 	@Nested
-	class TestGetSavedFileContent {
-		@BeforeEach
-		void setUp() {
-			doNothing().when(observer).closeInputPipe();
-		}
+	class TestAddAsRepresentation {
 
-		@Nested
-		class TestOnNoExceptions {
-			@Mock
-			private File fileContent;
-
-			@BeforeEach
-			void setUp() {
-				setFileContent(CompletableFuture.completedFuture(fileContent));
-			}
-
-			@Test
-			void shouldReturnFile() {
-				var savedFileContent = observer.getSavedFileContent();
-
-				assertThat(savedFileContent).isSameAs(fileContent);
-			}
-
-			@Test
-			void shouldCallCloseInputPipe() {
-				observer.getSavedFileContent();
-
-				verify(observer).closeInputPipe();
-			}
-		}
-
-		@Nested
-		class TestOnExecutionException {
-
-			@BeforeEach
-			void setUp() {
-				setFileContent(CompletableFuture.failedFuture(new Exception()));
-			}
-
-			@Test
-			void shouldThrowTechnicalException() {
-				assertThrows(TechnicalException.class, () -> observer.getSavedFileContent());
-			}
-
-			@Test
-			void shouldCallCloseInputPipe() {
-				try {
-					observer.getSavedFileContent();
-				} catch (TechnicalException e) {
-					// expected
-				}
-
-				verify(observer).closeInputPipe();
-			}
-		}
-
-		@Nested
-		class TestOnTimeoutException {
-
-			@Mock
-			private CompletableFuture<File> fileFuture;
-
-			@BeforeEach
-			@SneakyThrows
-			void setUp() {
-				setFileContent(fileFuture);
-				when(fileFuture.get(anyLong(), any())).thenThrow(new TimeoutException());
-			}
-
-			@Test
-			void shouldThrowTechnicalException() {
-				assertThrows(TechnicalException.class, () -> observer.getSavedFileContent());
-			}
-
-			@Test
-			void shouldCallCloseInputPipe() {
-				try {
-					observer.getSavedFileContent();
-				} catch (TechnicalException e) {
-					// expected
-				}
-
-				verify(observer).closeInputPipe();
-			}
-		}
-
-		@Nested
-		class TestOnInterruptedException {
-
-			@Mock
-			private CompletableFuture<File> fileFuture;
-
-			@BeforeEach
-			@SneakyThrows
-			void setUp() {
-				setFileContent(fileFuture);
-				when(fileFuture.get(anyLong(), any())).thenThrow(new InterruptedException());
-			}
-
-			@Test
-			void shouldThrowTechnicalException() {
-				assertThrows(TechnicalException.class, () -> observer.getSavedFileContent());
-			}
-
-			@Test
-			void shouldInterruptCurrentThread() {
-				try {
-					observer.getSavedFileContent();
-				} catch (TechnicalException e) {
-					// expected
-				}
-
-				assertThat(Thread.currentThread().isInterrupted()).isTrue();
-			}
-
-			@Test
-			void shouldCallCloseInputPipe() {
-				try {
-					observer.getSavedFileContent();
-				} catch (TechnicalException e) {
-					// expected
-				}
-
-				verify(observer).closeInputPipe();
-			}
-		}
-	}
+		@Mock
+		private ContentCollector representationCollector;
 
-	@Nested
-	class TestResetFielReceiving {
+		private final IncomingFile incomingFile = IncomingFileTestFactory.create();
 
 		@BeforeEach
 		void setUp() {
-			setCurrentFile(IncomingFileTestFactory.create());
-			setGroupName(GrpcAttachmentFileTestFactory.GROUP_NAME);
-			setPipedOutput(mock(PipedOutputStream.class));
-			setPipedOutput(mock(PipedOutputStream.class));
-			setFileContent(CompletableFuture.completedFuture(mock(File.class)));
-		}
-
-		@Test
-		void shouldResetCurrentFile() {
-			observer.resetFileReceiving();
-
-			assertThat(getCurrentFile()).isNull();
-		}
-
-		@Test
-		void shouldResetGroupName() {
-			observer.resetFileReceiving();
-
-			assertThat(getGroupName()).isNull();
-		}
-
-		@Test
-		void shouldResetPipedOutput() {
-			observer.resetFileReceiving();
-
-			assertThat(getPipedOutput()).isNull();
+			setRepresentationCollector(representationCollector);
 		}
 
 		@Test
-		void shouldResetPipedInput() {
-			observer.resetFileReceiving();
+		void shouldAddFileToRepresentations() {
+			observer.addAsRepresentation(incomingFile);
 
-			assertThat(getPipedInput()).isNull();
+			assertThat(getRepresentations()).usingRecursiveFieldByFieldElementComparator().containsExactly(incomingFile);
 		}
 
 		@Test
-		void shouldResetReceivingFileContent() {
-			observer.resetFileReceiving();
+		void shouldSetRepresentationCollectorToNull() {
+			observer.addAsRepresentation(incomingFile);
 
-			assertThat(getFileContent()).isNull();
+			assertThat(getRepresentationCollector()).isNull();
 		}
 	}
 
 	@Nested
 	class TestOnError {
 
-		@BeforeEach
-		void mock() {
-			doNothing().when(observer).closeOutputPipe();
-			doNothing().when(observer).closeInputPipe();
-		}
-
-		@Test
-		void shouldCallCloseOutputPipe() {
-			observer.onError(new Exception());
-
-			verify(observer).closeOutputPipe();
-		}
-
-		@Test
-		void shouldCallCloseInputPipe() {
-			observer.onError(new Exception());
-
-			verify(observer).closeInputPipe();
-		}
-	}
-
-	@Nested
-	class TestCloseOutputPipe {
-
 		@Mock
-		private PipedOutputStream pipedOutput;
-
-		@BeforeEach
-		void setUp() {
-			setPipedOutput(pipedOutput);
-		}
-
-		@Test
-		@SneakyThrows
-		void shouldClosePipedOutput() {
-			observer.closeOutputPipe();
-
-			verify(pipedOutput).close();
-		}
-
-		@Test
-		@SneakyThrows
-		void shouldNotThrowException() {
-			doThrow(IOException.class).when(pipedOutput).close();
-
-			assertDoesNotThrow(() -> observer.closeOutputPipe());
-		}
-	}
-
-	@Nested
-	class TestCloseInputPipe {
-
+		private ContentCollector attachmentCollector;
 		@Mock
-		private PipedInputStream pipedInput;
+		private ContentCollector representationCollector;
 
 		@BeforeEach
-		void setUp() {
-			setPipedInput(pipedInput);
+		void mock() {
+			setAttachmentCollector(attachmentCollector);
+			setRepresentationCollector(representationCollector);
 		}
 
 		@Test
-		@SneakyThrows
-		void shouldClosePipedInput() {
-			observer.closeInputPipe();
+		void shouldCloseAttachmentCollector() {
+			observer.onError(new Exception());
 
-			verify(pipedInput).close();
+			verify(attachmentCollector).close();
 		}
 
 		@Test
-		@SneakyThrows
-		void shouldNotThrowException() {
-			doThrow(IOException.class).when(pipedInput).close();
+		void shouldCloseRepresentationCollector() {
+			observer.onError(new Exception());
 
-			assertDoesNotThrow(() -> observer.closeInputPipe());
+			verify(representationCollector).close();
 		}
 	}
 
@@ -987,39 +573,6 @@ class EingangStubReceiverStreamObserverTest {
 		ReflectionTestUtils.setField(observer, "groupName", groupName);
 	}
 
-	private IncomingFile getCurrentFile() {
-		return (IncomingFile) ReflectionTestUtils.getField(observer, "currentFile");
-	}
-
-	private void setCurrentFile(IncomingFile incomingFile) {
-		ReflectionTestUtils.setField(observer, "currentFile", incomingFile);
-	}
-
-	private void setFileContent(CompletableFuture<File> fileFuture) {
-		ReflectionTestUtils.setField(observer, "receivingFileContent", fileFuture);
-	}
-
-	@SuppressWarnings("unchecked")
-	private CompletableFuture<File> getFileContent() {
-		return (CompletableFuture<File>) ReflectionTestUtils.getField(observer, "receivingFileContent");
-	}
-
-	private PipedInputStream getPipedInput() {
-		return (PipedInputStream) ReflectionTestUtils.getField(observer, "pipedInput");
-	}
-
-	private void setPipedInput(PipedInputStream pipedInput) {
-		ReflectionTestUtils.setField(observer, "pipedInput", pipedInput);
-	}
-
-	private PipedOutputStream getPipedOutput() {
-		return (PipedOutputStream) ReflectionTestUtils.getField(observer, "pipedOutput");
-	}
-
-	private void setPipedOutput(PipedOutputStream pipedOutput) {
-		ReflectionTestUtils.setField(observer, "pipedOutput", pipedOutput);
-	}
-
 	@SuppressWarnings("unchecked")
 	private List<IncomingFile> getRepresentations() {
 		return (List<IncomingFile>) ReflectionTestUtils.getField(observer, "representations");
@@ -1038,4 +591,20 @@ class EingangStubReceiverStreamObserverTest {
 		ReflectionTestUtils.setField(observer, "attachments", attachments);
 	}
 
+	private ContentCollector getAttachmentCollector() {
+		return (ContentCollector) ReflectionTestUtils.getField(observer, "attachmentCollector");
+	}
+
+	private void setAttachmentCollector(ContentCollector attachmentCollector) {
+		ReflectionTestUtils.setField(observer, "attachmentCollector", attachmentCollector);
+	}
+
+	private ContentCollector getRepresentationCollector() {
+		return (ContentCollector) ReflectionTestUtils.getField(observer, "representationCollector");
+	}
+
+	private void setRepresentationCollector(ContentCollector representationCollector) {
+		ReflectionTestUtils.setField(observer, "representationCollector", representationCollector);
+	}
+
 }
-- 
GitLab