小米训练营-第五次作业 读取文件的内容,每行是一个 json,通过 RocketMQ 发送数据到队列 实现思路:调用接口,读取resource下的hotel_data.txt文件,解析文件并转化成Hotel类,并向RocketMQ发送消息
具体代码:
package cn.yomigaeri.xiaomi.controller;import cn.yomigaeri.xiaomi.entity.po.Hotel;import cn.yomigaeri.xiaomi.entity.po.Result;import cn.yomigaeri.xiaomi.producer.Producer;import com.fasterxml.jackson.databind.ObjectMapper;import lombok.RequiredArgsConstructor;import org.springframework.core.io.Resource;import org.springframework.core.io.ResourceLoader;import org.springframework.web.bind.annotation.*;import java.io.File;import java.io.IOException;import java.nio.file.Files;import java.util.ArrayList;import java.util.stream.Stream;import java.util.List;@RestController @RequiredArgsConstructor public class HotelMessageController { private final ResourceLoader resourceLoader; private final Producer producer; private final ObjectMapper objectMapper; @GetMapping("/owner") public Result getHotel () throws IOException { return Result.success(); } @GetMapping("/hotel") public Result sendConfirmMessage () throws IOException { Resource resource = resourceLoader.getResource("classpath:hotel_data.txt" ); File file = resource.getFile(); List<Hotel> hotels = parseHotelsFromFile(file); for (Hotel hotel : hotels) { producer.sendConfirmMessage(hotel); } return Result.success("Hotels loaded and sent successfully. Count: " + hotels.size()); } private List<Hotel> parseHotelsFromFile (File file) throws IOException { List<Hotel> hotels = new ArrayList <>(); try (Stream<String> lines = Files.lines(file.toPath())) { for (String line : lines.toList()) { Hotel hotel = objectMapper.readValue(line, Hotel.class); hotels.add(hotel); } } return hotels; } }
消费者代码:
package cn.yomigaeri.xiaomi.consumer;import cn.yomigaeri.xiaomi.entity.po.Hotel;import cn.yomigaeri.xiaomi.entity.po.HotelDoc;import cn.yomigaeri.xiaomi.util.ElasticsearchUtil;import co.elastic.clients.elasticsearch.ElasticsearchClient;import co.elastic.clients.elasticsearch.core.BulkRequest;import co.elastic.clients.elasticsearch.core.BulkResponse;import co.elastic.clients.elasticsearch.core.IndexRequest;import co.elastic.clients.elasticsearch.core.IndexResponse;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;import org.apache.rocketmq.spring.core.RocketMQListener;import org.springframework.stereotype.Component;import javax.net.ssl.SSLContext;import javax.net.ssl.TrustManagerFactory;import java.io.FileInputStream;import java.io.IOException;import java.security.KeyStore;import java.security.cert.CertificateFactory;import java.security.cert.X509Certificate;@Component @RocketMQMessageListener( topic = "cjt0528", consumerGroup = "cjt-consumer" ) public class Consumer implements RocketMQListener <Hotel> { private final ElasticsearchClient esClient = ElasticsearchUtil.createClient(); @Override public void onMessage (Hotel hotel) { System.out.println("消费者收到消息:" + hotel); IndexRequest<Hotel> request = IndexRequest.of(b -> b .index("hotels" ) .id(hotel.getId().toString()) .document(hotel) ); IndexResponse response = null ; try { response = esClient.index(request); } catch (IOException ex) { throw new RuntimeException (ex); } System.out.println("文档索引成功,ID: " + response.id()); } }
效果展示:
ES 按条件查询搜索结果截图:
(管理的都是高档酒店,需要价格大于500):
ES文档总数的截图:
ES 文档结构设计截图: