创建一个事件
自定义一个kafka消费结束的事件,其中CompositeBuilder是一个组合多个数据的内部静态类。
package cn.unuuc.springboot.study.event;
import lombok.Builder;
import lombok.Data;
import org.springframework.context.ApplicationEvent;
import java.time.LocalDateTime;
import java.util.List;
/**
* 自定义消费者结束事件
*/
public class KafkaConsumerEndEvent extends ApplicationEvent {
private CompositeBuilder compositeBuilder;
public KafkaConsumerEndEvent(Object source, CompositeBuilder compositeBuilder) {
super(source);
this.compositeBuilder = compositeBuilder;
}
public CompositeBuilder getCompositeBuilder() {
return compositeBuilder;
}
@Builder
@Data
public static class CompositeBuilder {
// 这里是组装的对象,大概示意一下
private String topic;
private LocalDateTime time;
private List<String> snapImgList;
// ........
}
}
事件发布者
package cn.unuuc.springboot.study.event;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
* kafka消费结束事件发布者
*/
@Component
public class KafkaConsumerPublisher {
@Resource
private ApplicationEventPublisher applicationEventPublisher;
public void publishEvent(KafkaConsumerEndEvent.CompositeBuilder compositeBuilder) {
applicationEventPublisher.publishEvent(compositeBuilder);
}
}
在kafka消费结束时发布
package cn.unuuc.springboot.study.service;
import cn.unuuc.springboot.study.event.KafkaConsumerEndEvent;
import cn.unuuc.springboot.study.event.KafkaConsumerPublisher;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.ArrayList;
@Service
public class KafkaService {
@Resource
KafkaConsumerPublisher kafkaConsumerPublisher;
public void kafkaTask() {
System.out.println("kafka开始执行");
System.out.println("kafka执行中");
System.out.println("kafka结束了,发布一个结束的事件");
KafkaConsumerEndEvent.CompositeBuilder build = KafkaConsumerEndEvent.CompositeBuilder.builder().time(LocalDateTime.now()).topic("hello-world").snapImgList(new ArrayList<>()).build();
kafkaConsumerPublisher.publishEvent(build);
}
}
事件监听者
package cn.unuuc.springboot.study.event;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaTaskEndListener {
@EventListener
public void handleUserLoginEvent(KafkaConsumerEndEvent.CompositeBuilder compositeBuilder) {
System.out.println("监听到kafka结束事件");
System.out.println(compositeBuilder.toString());
}
}
评论区