diff --git a/pom.xml b/pom.xml
index 305c0567c014db2b90dc8e285ee020fa1e5b5138..6e56667367ff1c879f915f17b99e18c4d826e3a9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -41,7 +41,7 @@
 	<inceptionYear>2024</inceptionYear>
 
 	<properties>
-		<ozgcloud.api-lib.version>0.18.0</ozgcloud.api-lib.version>
+		<ozgcloud.api-lib.version>0.19.0-SNAPSHOT</ozgcloud.api-lib.version>
 		<jslt.version>0.1.14</jslt.version>
 		<hibernate-validator.version>8.0.2.Final</hibernate-validator.version>
 		<spring-boot.build-image.imageName>docker.ozg-sh.de/aggregation-manager:build-latest</spring-boot.build-image.imageName>
diff --git a/src/main/java/de/ozgcloud/aggregation/AggregationManagerRunner.java b/src/main/java/de/ozgcloud/aggregation/AggregationManagerRunner.java
index 3629c317ecacf3a46c778bab7472af4c11fea46b..ccfe40139c90b6f80ff097426b707ad7515f42ce 100644
--- a/src/main/java/de/ozgcloud/aggregation/AggregationManagerRunner.java
+++ b/src/main/java/de/ozgcloud/aggregation/AggregationManagerRunner.java
@@ -25,6 +25,7 @@ package de.ozgcloud.aggregation;
 
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.function.Function;
 import java.util.function.IntFunction;
@@ -40,6 +41,7 @@ import org.springframework.boot.CommandLineRunner;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 
+import de.ozgcloud.aggregation.transformation.AggregationMapping.FormIdentifier;
 import de.ozgcloud.aggregation.transformation.Transformation;
 import de.ozgcloud.aggregation.transformation.TransformationException;
 import de.ozgcloud.aggregation.transformation.TransformationService;
@@ -48,6 +50,7 @@ import de.ozgcloud.aggregation.warehouse.DocumentEntry;
 import de.ozgcloud.aggregation.warehouse.WarehouseRepository;
 import de.ozgcloud.apilib.vorgang.OzgCloudVorgang;
 import de.ozgcloud.apilib.vorgang.OzgCloudVorgangQuery;
+import de.ozgcloud.apilib.vorgang.OzgCloudVorgangQuery.FormIdentification;
 import de.ozgcloud.apilib.vorgang.OzgCloudVorgangService;
 import de.ozgcloud.apilib.vorgang.OzgCloudVorgangStub;
 import de.ozgcloud.apilib.vorgang.Page;
@@ -80,45 +83,57 @@ public class AggregationManagerRunner implements CommandLineRunner {
 		var identifier = transformationProperties.getIdentifier();
 		var aggregationMappings = transformationProperties.getAggregationMappings();
 		if (Objects.isNull(aggregationMappings) || aggregationMappings.isEmpty()) {
-			runWithTransformation(transformationService.load(identifier, null));
+			runWithTransformation(transformationService.load(identifier, null), Optional.empty());
 		} else {
 			aggregationMappings.stream()
-					.map(aggregationMapping -> transformationService.load(identifier, aggregationMapping))
-					.forEach(this::runWithTransformation);
+					.forEach(aggregationMapping -> runWithTransformation(transformationService.load(identifier, aggregationMapping),
+							Optional.of(aggregationMapping.getFormIdentifier())));
 		}
 	}
 
-	void runWithTransformation(Transformation transformation) {
+	void runWithTransformation(Transformation transformation, Optional<FormIdentifier> formIdentifier) {
 		try (Execution execution = new Execution(transformation)) {
 			ThreadContext.put(MDC_EXECUTION, execution.id.toString());
 			loadVorgaengeIntoRepository(Stream.concat(
-					extractBatchesOfVorgaengeFromDataSource(execution),
-					extractBatchesOfDeletedVorgaengeFromDataSource(execution)));
+					extractBatchesOfVorgaengeFromDataSource(execution, formIdentifier),
+					extractBatchesOfDeletedVorgaengeFromDataSource(execution, formIdentifier)));
 		} finally {
 			ThreadContext.remove(MDC_EXECUTION);
 		}
 	}
 
 	void loadVorgaengeIntoRepository(Stream<Batch> batches) {
+		repository.deleteAll();
 		batches.map(this::transformBatchToDocumentEntries).forEach(this::loadDocumentEntriesIntoRepository);
 	}
 
-	Stream<Batch> extractBatchesOfVorgaengeFromDataSource(Execution execution) {
-		return extractBatchesFromDataSource(execution, this::getVorgaengeFromDataSource);
+	Stream<Batch> extractBatchesOfVorgaengeFromDataSource(Execution execution, Optional<FormIdentifier> formIdentifier) {
+		return extractBatchesFromDataSource(execution, page -> getVorgaengeFromDataSource(page, formIdentifier));
 	}
 
-	List<OzgCloudVorgang> getVorgaengeFromDataSource(Page page) {
-		return vorgangService.find(buildFindAllQuery(), page).stream()
+	List<OzgCloudVorgang> getVorgaengeFromDataSource(Page page, Optional<FormIdentifier> formIdentifier) {
+		return vorgangService.find(buildFindByFormEngineQuery(formIdentifier), page).stream()
 				.map(vorgangStub -> vorgangService.getById(vorgangStub.getId()))
 				.toList();
 	}
 
-	OzgCloudVorgangQuery buildFindAllQuery() {
-		return OzgCloudVorgangQuery.builder().build();
+	OzgCloudVorgangQuery buildFindByFormEngineQuery(Optional<FormIdentifier> formIdentifier) {
+		return OzgCloudVorgangQuery.builder()
+				.form(mapToFormIdentification(formIdentifier))
+				.build();
 	}
 
-	Stream<Batch> extractBatchesOfDeletedVorgaengeFromDataSource(Execution execution) {
-		return extractBatchesFromDataSource(execution, getPagedDeletedVorgaenge(vorgangService.findDeleted()));
+	private Optional<FormIdentification> mapToFormIdentification(Optional<FormIdentifier> formIdentifier) {
+		return formIdentifier
+				.map(identifier -> FormIdentification.builder()
+						.formId(identifier.getFormId())
+						.formEngineName(identifier.getFormEngineName())
+						.build());
+	}
+
+	Stream<Batch> extractBatchesOfDeletedVorgaengeFromDataSource(Execution execution, Optional<FormIdentifier> formIdentifier) {
+		return formIdentifier.isEmpty() ? extractBatchesFromDataSource(execution, getPagedDeletedVorgaenge(vorgangService.findDeleted()))
+				: Stream.empty();
 	}
 
 	Function<Page, List<OzgCloudVorgang>> getPagedDeletedVorgaenge(Stream<OzgCloudVorgangStub> allDeletedVorgaenge) {
diff --git a/src/test/java/de/ozgcloud/aggregation/AggregationManagerRunnerTest.java b/src/test/java/de/ozgcloud/aggregation/AggregationManagerRunnerTest.java
index b9e7f4c3aa06073458d6485d9281d0af9d427ca2..1e578c804c4a8caa7022cce046ec08f37c059fcc 100644
--- a/src/test/java/de/ozgcloud/aggregation/AggregationManagerRunnerTest.java
+++ b/src/test/java/de/ozgcloud/aggregation/AggregationManagerRunnerTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.*;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.UUID;
 import java.util.function.Function;
 import java.util.stream.Stream;
@@ -48,7 +49,9 @@ import com.thedeanda.lorem.LoremIpsum;
 import de.ozgcloud.aggregation.AggregationManagerRunner.Batch;
 import de.ozgcloud.aggregation.AggregationManagerRunner.Execution;
 import de.ozgcloud.aggregation.transformation.AggregationMapping;
+import de.ozgcloud.aggregation.transformation.AggregationMapping.FormIdentifier;
 import de.ozgcloud.aggregation.transformation.AggregationMappingTestFactory;
+import de.ozgcloud.aggregation.transformation.FormIdentifierTestFactory;
 import de.ozgcloud.aggregation.transformation.Transformation;
 import de.ozgcloud.aggregation.transformation.TransformationService;
 import de.ozgcloud.aggregation.transformation.VorgangMapper;
@@ -88,7 +91,7 @@ class AggregationManagerRunnerTest {
 		@BeforeEach
 		void mock() {
 			when(transformationProperties.getIdentifier()).thenReturn(identifier);
-			doNothing().when(runner).runWithTransformation(any());
+			doNothing().when(runner).runWithTransformation(any(), any());
 		}
 
 		@Test
@@ -132,8 +135,8 @@ class AggregationManagerRunnerTest {
 			void shouldRunWithTransformationForEachTransformation() {
 				runner.run();
 
-				verify(runner).runWithTransformation(firstTransformation);
-				verify(runner).runWithTransformation(secondTransformation);
+				verify(runner).runWithTransformation(firstTransformation, Optional.of(AggregationMappingTestFactory.FORM_IDENTIFIER));
+				verify(runner).runWithTransformation(secondTransformation, Optional.of(AggregationMappingTestFactory.FORM_IDENTIFIER));
 			}
 		}
 
@@ -159,7 +162,7 @@ class AggregationManagerRunnerTest {
 			void shouldCallRunWithTransformation() {
 				runner.run();
 
-				verify(runner).runWithTransformation(transformation);
+				verify(runner).runWithTransformation(transformation, Optional.empty());
 			}
 		}
 
@@ -185,7 +188,7 @@ class AggregationManagerRunnerTest {
 			void shouldCallRunWithTransformation() {
 				runner.run();
 
-				verify(runner).runWithTransformation(transformation);
+				verify(runner).runWithTransformation(transformation, Optional.empty());
 			}
 		}
 	}
@@ -196,6 +199,7 @@ class AggregationManagerRunnerTest {
 		@Mock
 		private Transformation transformation;
 		private final ArgumentMatcher<Execution> hasTransformation = execution -> execution.getTransformation().equals(transformation);
+		private final Optional<FormIdentifier> formIdentifier = Optional.of(FormIdentifierTestFactory.create());
 		@Mock
 		private Batch batchOfVorgaenge;
 		@Mock
@@ -205,28 +209,28 @@ class AggregationManagerRunnerTest {
 
 		@BeforeEach
 		void init() {
-			doReturn(Stream.of(batchOfVorgaenge)).when(runner).extractBatchesOfVorgaengeFromDataSource(any());
-			doReturn(Stream.of(batchOfDeletedVorgaenge)).when(runner).extractBatchesOfDeletedVorgaengeFromDataSource(any());
+			doReturn(Stream.of(batchOfVorgaenge)).when(runner).extractBatchesOfVorgaengeFromDataSource(any(), any());
+			doReturn(Stream.of(batchOfDeletedVorgaenge)).when(runner).extractBatchesOfDeletedVorgaengeFromDataSource(any(), any());
 			doNothing().when(runner).loadVorgaengeIntoRepository(any());
 		}
 
 		@Test
 		void shouldExtractBatchesOfVorgaengeFromDataSource() {
-			runner.runWithTransformation(transformation);
+			runner.runWithTransformation(transformation, formIdentifier);
 
-			verify(runner).extractBatchesOfVorgaengeFromDataSource(argThat(hasTransformation));
+			verify(runner).extractBatchesOfVorgaengeFromDataSource(argThat(hasTransformation), eq(formIdentifier));
 		}
 
 		@Test
 		void shouldExtractBatchesOfDeletedVorgaengeFromDataSource() {
-			runner.runWithTransformation(transformation);
+			runner.runWithTransformation(transformation, formIdentifier);
 
-			verify(runner).extractBatchesOfDeletedVorgaengeFromDataSource(argThat(hasTransformation));
+			verify(runner).extractBatchesOfDeletedVorgaengeFromDataSource(argThat(hasTransformation), eq(formIdentifier));
 		}
 
 		@Test
 		void shouldLoadVorgaengeIntoRepository() {
-			runner.runWithTransformation(transformation);
+			runner.runWithTransformation(transformation, formIdentifier);
 
 			verify(runner).loadVorgaengeIntoRepository(batchStreamCaptor.capture());
 			assertThat(batchStreamCaptor.getValue()).containsExactly(batchOfVorgaenge, batchOfDeletedVorgaenge);
@@ -250,6 +254,13 @@ class AggregationManagerRunnerTest {
 			doNothing().when(runner).loadDocumentEntriesIntoRepository(any());
 		}
 
+		@Test
+		void shouldDropCollection() {
+			loadVorgaengeIntoRepository();
+
+			verify(repository).deleteAll();
+		}
+
 		@Test
 		void shouldTransform() {
 			loadVorgaengeIntoRepository();
@@ -273,6 +284,7 @@ class AggregationManagerRunnerTest {
 	@Nested
 	class TestExtractBatchesOfVorgaengeFromDataSource {
 
+		private final Optional<FormIdentifier> formIdentifier = Optional.of(FormIdentifierTestFactory.create());
 		@Mock
 		private Execution execution;
 		@Captor
@@ -289,23 +301,23 @@ class AggregationManagerRunnerTest {
 
 		@Test
 		void shouldExtract() {
-			runner.extractBatchesOfVorgaengeFromDataSource(execution);
+			runner.extractBatchesOfVorgaengeFromDataSource(execution, formIdentifier);
 
 			verify(runner).extractBatchesFromDataSource(eq(execution), any());
 		}
 
 		@Test
 		void shouldExtractWithDataRetrievalFunction() {
-			runner.extractBatchesOfVorgaengeFromDataSource(execution);
+			runner.extractBatchesOfVorgaengeFromDataSource(execution, formIdentifier);
 
 			verify(runner).extractBatchesFromDataSource(eq(execution), functionToRetrieveDataCaptor.capture());
 			functionToRetrieveDataCaptor.getValue().apply(page);
-			verify(runner).getVorgaengeFromDataSource(page);
+			verify(runner).getVorgaengeFromDataSource(page, formIdentifier);
 		}
 
 		@Test
 		void shouldReturnExtractedBatches() {
-			var extracted = runner.extractBatchesOfVorgaengeFromDataSource(execution);
+			var extracted = runner.extractBatchesOfVorgaengeFromDataSource(execution, formIdentifier);
 
 			assertThat(extracted).containsExactly(batch);
 		}
@@ -315,15 +327,23 @@ class AggregationManagerRunnerTest {
 	class TestGetVorgaengeFromDataSource {
 
 		private final Page page = Page.builder().offset(10).limit(2).build();
+		private final Optional<FormIdentifier> formIdentifier = Optional.of(FormIdentifierTestFactory.create());
 		private final OzgCloudVorgangQuery query = OzgCloudVorgangQuery.builder().build();
 
 		@BeforeEach
 		void init() {
-			doReturn(query).when(runner).buildFindAllQuery();
+			doReturn(query).when(runner).buildFindByFormEngineQuery(any());
 			when(vorgangService.find(any(), any())).thenReturn(List.of(OzgCloudVorgangStubTestFactory.create()));
 			when(vorgangService.getById(any())).thenReturn(OzgCloudVorgangTestFactory.create());
 		}
 
+		@Test
+		void shouldCallBuildFindByFormEngineQuery() {
+			getVorgaengeFromDataSource();
+
+			verify(runner).buildFindByFormEngineQuery(formIdentifier);
+		}
+
 		@Test
 		void shouldCallVorgangService() {
 			getVorgaengeFromDataSource();
@@ -346,7 +366,42 @@ class AggregationManagerRunnerTest {
 		}
 
 		private List<OzgCloudVorgang> getVorgaengeFromDataSource() {
-			return runner.getVorgaengeFromDataSource(page);
+			return runner.getVorgaengeFromDataSource(page, formIdentifier);
+		}
+	}
+
+	@Nested
+	class TestBuildFindByFormEngineQuery {
+
+		@Nested
+		class TestOnEmptyFormIdentifier {
+
+			@Test
+			void shouldReturnFindAllQueryOnEmptyFormIdentifier() {
+				var query = runner.buildFindByFormEngineQuery(Optional.empty());
+
+				assertThat(query).usingRecursiveComparison().isEqualTo(OzgCloudVorgangQuery.builder().build());
+			}
+		}
+
+		@Nested
+		class TestOnFormIdentifierNotEmpty {
+
+			private final Optional<FormIdentifier> formIdentifier = Optional.of(FormIdentifierTestFactory.create());
+
+			@Test
+			void shouldSetFormIdInQuery() {
+				var query = runner.buildFindByFormEngineQuery(formIdentifier);
+
+				assertThat(query.getForm()).get().extracting("formId").isEqualTo(FormIdentifierTestFactory.FORM_ID);
+			}
+
+			@Test
+			void shouldSetFormEngineNameInQuery() {
+				var query = runner.buildFindByFormEngineQuery(formIdentifier);
+
+				assertThat(query.getForm()).get().extracting("formEngineName").isEqualTo(FormIdentifierTestFactory.FORM_ENGINE_NAME);
+			}
 		}
 	}
 
@@ -363,40 +418,58 @@ class AggregationManagerRunnerTest {
 		@Mock
 		private Batch batch;
 
-		@BeforeEach
-		void init() {
-			when(vorgangService.findDeleted()).thenReturn(deletedVorgaenge.stream());
-			doReturn(functionToRetrieveData).when(runner).getPagedDeletedVorgaenge(any());
-			doReturn(Stream.of(batch)).when(runner).extractBatchesFromDataSource(any(), any());
-		}
+		@Nested
+		class TestOnEmptyFormIdentifier {
 
-		@Test
-		void shouldFindDeleted() {
-			runner.extractBatchesOfDeletedVorgaengeFromDataSource(execution);
+			@BeforeEach
+			void init() {
+				when(vorgangService.findDeleted()).thenReturn(deletedVorgaenge.stream());
+				doReturn(functionToRetrieveData).when(runner).getPagedDeletedVorgaenge(any());
+				doReturn(Stream.of(batch)).when(runner).extractBatchesFromDataSource(any(), any());
+			}
 
-			verify(vorgangService).findDeleted();
-		}
+			@Test
+			void shouldFindDeleted() {
+				runner.extractBatchesOfDeletedVorgaengeFromDataSource(execution, Optional.empty());
 
-		@Test
-		void shouldGetPagedDeletedVorgaenge() {
-			runner.extractBatchesOfDeletedVorgaengeFromDataSource(execution);
+				verify(vorgangService).findDeleted();
+			}
 
-			verify(runner).getPagedDeletedVorgaenge(deletedVorgaengeCaptor.capture());
-			assertThat(deletedVorgaengeCaptor.getValue()).usingRecursiveFieldByFieldElementComparator().containsExactlyElementsOf(deletedVorgaenge);
-		}
+			@Test
+			void shouldGetPagedDeletedVorgaenge() {
+				runner.extractBatchesOfDeletedVorgaengeFromDataSource(execution, Optional.empty());
 
-		@Test
-		void shouldExtractWithDataRetrievalFunction() {
-			runner.extractBatchesOfDeletedVorgaengeFromDataSource(execution);
+				verify(runner).getPagedDeletedVorgaenge(deletedVorgaengeCaptor.capture());
+				assertThat(deletedVorgaengeCaptor.getValue()).usingRecursiveFieldByFieldElementComparator()
+						.containsExactlyElementsOf(deletedVorgaenge);
+			}
+
+			@Test
+			void shouldExtractWithDataRetrievalFunction() {
+				runner.extractBatchesOfDeletedVorgaengeFromDataSource(execution, Optional.empty());
+
+				verify(runner).extractBatchesFromDataSource(execution, functionToRetrieveData);
+			}
+
+			@Test
+			void shouldReturnExtractedBatches() {
+				var extracted = runner.extractBatchesOfDeletedVorgaengeFromDataSource(execution, Optional.empty());
 
-			verify(runner).extractBatchesFromDataSource(execution, functionToRetrieveData);
+				assertThat(extracted).containsExactly(batch);
+			}
 		}
 
-		@Test
-		void shouldReturnExtractedBatches() {
-			var extracted = runner.extractBatchesOfDeletedVorgaengeFromDataSource(execution);
+		@Nested
+		class TestOnFormIdentifierNotEmpty {
 
-			assertThat(extracted).containsExactly(batch);
+			private final Optional<FormIdentifier> formIdentifier = Optional.of(FormIdentifierTestFactory.create());
+
+			@Test
+			void shouldReturnEmptyStream() {
+				var extracted = runner.extractBatchesOfDeletedVorgaengeFromDataSource(execution, formIdentifier);
+
+				assertThat(extracted).isEmpty();
+			}
 		}
 	}
 
@@ -483,8 +556,7 @@ class AggregationManagerRunnerTest {
 
 			assertThat(batches).hasSize(2).usingRecursiveFieldByFieldElementComparator().containsExactly(
 					new Batch(execution, BATCH_1_UUID, Page.builder().offset(0).limit(BATCH_SIZE).build(), VORGAENGE_1),
-					new Batch(execution, BATCH_2_UUID, Page.builder().offset(BATCH_SIZE).limit(BATCH_SIZE).build(), VORGAENGE_2)
-			);
+					new Batch(execution, BATCH_2_UUID, Page.builder().offset(BATCH_SIZE).limit(BATCH_SIZE).build(), VORGAENGE_2));
 		}
 
 		private ArgumentMatcher<Page> hasExpectedOffsetAndConfiguredLimit(int expectedOffset) {