英文:
How can I limit the total number of database entries read by RepositoryItemReader of my Spring Batch Multithreaded Step process?
问题
以下为翻译好的部分:
多线程的 Spring Batch 步骤表现几乎不稳定。我无法辨别出任何在其失败方式中的模式。有时它从数据库读取和写入了太多的记录,有时它没有读取足够多。
我使用 RepositoryItemReader 来执行自定义的本地查询。我为它定义了一个 countQuery,并且我使用了 reader 的 setMaxItemCount(totalLimit)
方法,但它似乎更像是一个建议,而不是一个实际的硬限制。因为在线程数为 4 的情况下,只有 1 个有意错误的记录会导致处理逻辑中的 1 次跳过,我看到过…
limit | pageSize | chunkSize || 实际写入次数
100 | 10 | 5 || 110 次唯一写入
800 | 100 | 25 || 804 次唯一写入,37 次重复写入(为什么?)
800 | 100 | 25 || 663 次唯一写入,165 次重复写入(为什么???)
我的项目使用的是 Spring Boot 2.1.11.RELEASE,看起来正在使用的 spring-batch-infrastructure 版本是 4.1.3.RELEASE。有没有人知道为什么在一个页面上发生 1 次跳过时,Spring Batch 会执行过多或重复的写入操作?
也许这与我配置的内存中的 JobRepository 有关…
这是我的 Repository 类:
@Repository
public interface MyEntityRepository extends JpaRepository<MyEntity, Integer> {
// ...
}
这是我如何配置我的任务:
@Configuration
public class ConversionBatchJobConfig {
// ...
}
这是我如何配置我的内存中的作业库:
@Configuration
@EnableBatchProcessing
public class InMemoryBatchManagementConfig {
// ...
}
编辑部分:
成功将 Spring Batch 与 H2 数据库一起使用,但仍然遇到相同的问题。这是我如何配置将批处理与 H2 一起使用的:
我导入了 H2 驱动:
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.200</version>
</dependency>
我将主要的数据库配置指向了我的 JPA 实体:
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(basePackages = "com.company.project.jpa.repository", transactionManagerRef = "transactionManager")
@EntityScan(basePackages = "com.company.project.jpa.entity")
public class DbConfig {
// ...
}
然后我像这样配置了内存中的批处理管理:
@Configuration
@EnableBatchProcessing
public class InMemoryBatchManagementConfig {
// ...
}
请注意,由于我只翻译了您提供的代码和问题的一部分,可能会导致上下文的不完整。如果您需要更多帮助,请随时询问。
英文:
My Multithreaded Spring Batch Step is behaving almost erratically. I haven't been able to discern any kind of pattern in the ways it's failing. Sometimes it reads and writes too many records from the database and sometimes it doesn't read enough.
I'm using a RepositoryItemReader to execute a custom native query. I've defined a countQuery for it and I've used the reader's setMaxItemCount(totalLimit)
method, but it seems to consider that more of a suggestion rather than an actual hard maximum. Because with a thread count of 4, and just 1 intentionally bad record that causes 1 skip in the processor logic, I've seen...
limit | pageSize | chunkSize || actual writes
100 | 10 | 5 || 110 unique writes
800 | 100 | 25 || 804 unique writes, and 37 duplicate writes (WHY?)
800 | 100 | 25 || 663 unique writes, and 165 duplicate writes (WHYYYY???)
My project is using Spring Boot 2.1.11.RELEASE and it looks like the version of spring-batch-infrastructure that's pulling in is 4.1.3.RELEASE. Does anyone have any idea why Spring Batch is performing either too many or duplicate writes when just 1 skip occurs on one of the pages?
Maybe it has something to do with the way I've configured my in-memory JobRepository...
Here's my repository class:
@Repository
public interface MyEntityRepository extends JpaRepository<MyEntity, Integer> {
String FROM_MY_ENTITY_TABLE_LEFT_JOINED_WITH_ANOTHER_TABLE = "from {h-schema}my_entity e" +
"left join {h-schema}another_table a" +
"on e.fk = a.pk ";
@Query(
value = "select e.id, e.name, a.additional_info" +
FROM_MY_ENTITY_TABLE_LEFT_JOINED_WITH_ANOTHER_TABLE +
"where e.status <> :status and e.add_date < :date",
countjQuery = "select count(*) " +
FROM_MY_ENTITY_TABLE_LEFT_JOINED_WITH_ANOTHER_TABLE +
"where e.status <> :status and e.add_date < :date",
nativeQuery = true)
Page<MyProjection> findMyProjectionsWithoutStatusBeforeDate(@Param("status") String status,
@Param("date") Date date,
Pageable page);
}
And here's how I've configured my job:
@Configuration
public class ConversionBatchJobConfig {
@Bean
public SimpleCompletionPolicy processChunkSize(@Value("${commit.chunk.size:5}") Integer chunkSize) {
return new SimpleCompletionPolicy(chunkSize);
}
@Bean
@StepScope
public ItemStreamReader<MyProjection> dbReader(
MyEntityRepository myEntityRepository,
@Value("#{jobParameters[startTime]}") Date startTime,
@Value("#{jobParameters[pageSize]}") Integer pageSize,
@Value("#{jobParameters[limit]}") Integer limit) {
RepositoryItemReader<MyProjection> myProjectionRepositoryReader = new RepositoryItemReader<>();
myProjectionRepositoryReader.setRepository(myEntityRepository);
myProjectionRepositoryReader.setMethodName("findMyProjectionsWithoutStatusBeforeDate");
myProjectionRepositoryReader.setArguments(new ArrayList<Object>() {{
add("REMOVED");
add(startTime);
}});
myProjectionRepositoryReader.setSort(new HashMap<String, Sort.Direction>() {{
put("e.id", Sort.Direction.ASC);
}});
myProjectionRepositoryReader.setPageSize(pageSize);
myProjectionRepositoryReader.setMaxItemCount(limit);
myProjectionRepositoryReader.setSaveState(false);
return myProjectionRepositoryReader;
}
@Bean
@StepScope
public ItemProcessor<MyProjection, JsonMessage> dataConverter(AdditionalDbDataRetrievalService dataRetrievalService) {
return new MyProjectionToJsonMessageConverter(dataRetrievalService); // <== simple ItemProcessor implementation
}
@Bean
@StepScope
public ItemWriter<JsonMessage> jsonPublisher(GcpPubsubPublisherService publisherService) {
return new JsonMessageWriter(publisherService); // <== simple ItemWriter implementation
}
@Bean
public Step conversionProcess(SimpleCompletionPolicy processChunkSize,
ItemStreamReader<MyProjection> dbReader,
ItemProcessor<MyProjection, JsonMessage> dataConverter,
ItemWriter<JsonMessage> jsonPublisher,
StepBuilderFactory stepBuilderFactory,
TaskExecutor conversionThreadPool,
@Value("${conversion.failure.limit:20}") int maximumFailures) {
return stepBuilderFactory.get("conversionProcess")
.<MyProjection, JsonMessage>chunk(processChunkSize)
.reader(dbReader)
.processor(dataConverter)
.writer(jsonPublisher)
.faultTolerant()
.skipPolicy(new MyCustomConversionSkipPolicy(maximumFailures))
// ^ for now this returns true for everything until 20 failures
.listener(new MyConversionSkipListener(processStatus))
// ^ for now this just logs the error
.taskExecutor(conversionThreadPool)
.build();
}
@Bean
public Job conversionJob(Step conversionProcess,
JobBuilderFactory jobBuilderFactory) {
return jobBuilderFactory.get("conversionJob")
.start(conversionProcess)
.build();
}
}
And here's how I've configured my in-memory Job Repository:
@Configuration
@EnableBatchProcessing
public class InMemoryBatchManagementConfig {
@Bean
public ResourcelessTransactionManager resourcelessTransactionManager() {
ResourcelessTransactionManager resourcelessTransactionManager = new ResourcelessTransactionManager();
return resourcelessTransactionManager;
}
@Bean
public MapJobRepositoryFactoryBean mapJobRepositoryFactory(ResourcelessTransactionManager resourcelessTransactionManager)
throws Exception {
MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean(resourcelessTransactionManager);
factory.afterPropertiesSet();
return factory;
}
@Bean
public JobRepository jobRepository(MapJobRepositoryFactoryBean factory) throws Exception {
return factory.getObject();
}
@Bean
public SimpleJobLauncher jobLauncher(JobRepository jobRepository) throws Exception {
SimpleJobLauncher launcher = new SimpleJobLauncher();
launcher.setJobRepository(jobRepository);
launcher.afterPropertiesSet();
return launcher;
}
@Bean
public JobExplorer jobExplorer(MapJobRepositoryFactoryBean factory) {
return new SimpleJobExplorer(factory.getJobInstanceDao(), factory.getJobExecutionDao(),
factory.getStepExecutionDao(), factory.getExecutionContextDao());
}
@Bean
public BatchConfigurer batchConfigurer(MapJobRepositoryFactoryBean mapJobRepositoryFactory,
ResourcelessTransactionManager resourceslessTransactionManager,
SimpleJobLauncher jobLauncher,
JobExplorer jobExplorer) {
return new BatchConfigurer() {
@Override
public JobRepository getJobRepository() throws Exception {
return mapJobRepositoryFactory.getObject();
}
@Override
public PlatformTransactionManager getTransactionManager() throws Exception {
return resourceslessTransactionManager;
}
@Override
public JobLauncher getJobLauncher() throws Exception {
return jobLauncher;
}
@Override
public JobExplorer getJobExplorer() throws Exception {
return jobExplorer;
}
};
}
}
EDIT
Was able to get Spring Batch working with an H2 database instead of a Map repository, but I'm still seeing the same issue. Here's how I configured batch to use H2:
I imported the H2 driver:
<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<version>1.4.200</version>
</dependency>
I configured my primary DB config to point to my JPA entities:
@Configuration
@EnableTransactionManagement
@EnableJpaRepositories(basePackages = "com.company.project.jpa.repository", transactionManagerRef = "transactionManager")
@EntityScan(basePackages = "com.company.project.jpa.entity")
public class DbConfig {
@Bean
@Primary
@ConfigurationProperties("oracle.datasource")
public DataSource dataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@Primary
public LocalContainerEntityManagerFactoryBean entityManagerFactory(DataSource dataSource,
EntityManagerFactoryBuilder builder) {
return builder.dataSource(dataSource).packages("com.company.project.jpa").build();
}
@Bean
@Primary
public PlatformTransactionManager transactionManager(
@Qualifier("entityManagerFactory") LocalContainerEntityManagerFactoryBean entityManagerFactory) {
return new JpaTransactionManager(entityManagerFactory.getObject());
}
}
And then I configured my in-memory Batch management like this:
@Configuration
@EnableBatchProcessing
public class InMemoryBatchManagementConfig {
@Bean(destroyMethod = "shutdown")
public EmbeddedDatabase h2DataSource() {
return new EmbeddedDatabaseBuilder().setType(EmbeddedDatabaseType.H2)
.addScript("classpath:org/springframework/batch/core/schema-drop-h2.sql")
.addScript("classpath:org/springframework/batch/core/schema-h2.sql")
.build();
}
@Bean
public LocalContainerEntityManagerFactoryBean h2EntityManagerFactory(EmbeddedDatabase h2DataSource,
EntityManagerFactoryBuilder builder) {
return builder.dataSource(h2DataSource).packages("org.springframework.batch.core").build();
}
@Bean
public PlatformTransactionManager h2TransactionManager(
@Qualifier("h2EntityManagerFactory") LocalContainerEntityManagerFactoryBean h2EntityManagerFactory) {
return new JpaTransactionManager(h2EntityManagerFactory.getObject());
}
@Bean
public JobRepository jobRepository(EmbeddedDatabase h2DataSource,
@Qualifier("h2TransactionManager") PlatformTransactionManager h2TransactionManager) throws Exception {
final JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDatabaseType(DatabaseType.H2.getProductName());
factory.setDataSource(h2DataSource);
factory.setTransactionManager(h2TransactionManager);
return factory.getObject();
}
@Bean
public SimpleJobLauncher jobLauncher(JobRepository jobRepository) throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean
public JobRepositoryFactoryBean jobRepositoryFactoryBean(EmbeddedDatabase h2DataSource,
@Qualifier("h2TransactionManager") PlatformTransactionManager h2TransactionManager) {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(h2DataSource);
jobRepositoryFactoryBean.setTransactionManager(h2TransactionManager);
return jobRepositoryFactoryBean;
}
@Bean
public BatchConfigurer batchConfigurer(JobRepository jobRepository,
SimpleJobLauncher jobLauncher,
@Qualifier("h2TransactionManager") PlatformTransactionManager h2TransactionManager,
JobExplorer jobExplorer) {
return new BatchConfigurer() {
@Override
public JobRepository getJobRepository() {
return jobRepository;
}
@Override
public PlatformTransactionManager getTransactionManager() {
return h2TransactionManager;
}
@Override
public JobLauncher getJobLauncher() {
return jobLauncher;
}
@Override
public JobExplorer getJobExplorer() {
return jobExplorer;
}
};
}
}
专注分享java语言的经验与见解,让所有开发者获益!
评论