小米训练营-第五次作业

读取文件的内容,每行是一个 json,通过 RocketMQ 发送数据到队列

​ 实现思路:调用接口,读取resource下的hotel_data.txt文件,解析文件并转化成Hotel类,并向RocketMQ发送消息

​ 具体代码:

package cn.yomigaeri.xiaomi.controller;


import cn.yomigaeri.xiaomi.entity.po.Hotel;
import cn.yomigaeri.xiaomi.entity.po.Result;
import cn.yomigaeri.xiaomi.producer.Producer;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import org.springframework.core.io.Resource;
import org.springframework.core.io.ResourceLoader;
import org.springframework.web.bind.annotation.*;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.stream.Stream;
import java.util.List;

/**
* @ClassName: ProductController
* @Package: cn.yomigaeri.xiaomi.controller
* @Description:
* @Author Yomigaeri
* @Create 2025/5/26 17:15
* @Version 1.0
*/



@RestController
@RequiredArgsConstructor
public class HotelMessageController {

private final ResourceLoader resourceLoader;
private final Producer producer;
private final ObjectMapper objectMapper;

@GetMapping("/owner")
public Result getHotel() throws IOException {
return Result.success();
}


@GetMapping("/hotel")
public Result sendConfirmMessage() throws IOException {
// Load the file from resources
Resource resource = resourceLoader.getResource("classpath:hotel_data.txt");
File file = resource.getFile();

// Parse JSON data from the file
List<Hotel> hotels = parseHotelsFromFile(file);

// Send each hotel data to the producer
for (Hotel hotel : hotels) {
producer.sendConfirmMessage(hotel); // or serialize to JSON if needed
}

return Result.success("Hotels loaded and sent successfully. Count: " + hotels.size());
}

private List<Hotel> parseHotelsFromFile(File file) throws IOException {
List<Hotel> hotels = new ArrayList<>();
try (Stream<String> lines = Files.lines(file.toPath())) {
for (String line : lines.toList()) {
Hotel hotel = objectMapper.readValue(line, Hotel.class);
hotels.add(hotel);
}
}
return hotels;
}
}

​ 消费者代码:

package cn.yomigaeri.xiaomi.consumer;

import cn.yomigaeri.xiaomi.entity.po.Hotel;
import cn.yomigaeri.xiaomi.entity.po.HotelDoc;
import cn.yomigaeri.xiaomi.util.ElasticsearchUtil;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch.core.BulkRequest;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.IndexRequest;
import co.elastic.clients.elasticsearch.core.IndexResponse;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.security.cert.CertificateFactory;
import java.security.cert.X509Certificate;

/**
* @ClassName: Consumer
* @Package: cn.yomigaeri.xiaomi.consumer
* @Description:
* @Author Yomigaeri
* @Create 2025/5/28 16:36
* @Version 1.0
*/
@Component
@RocketMQMessageListener(
topic = "cjt0528",
consumerGroup = "cjt-consumer"
)
public class Consumer implements RocketMQListener<Hotel> {

private final ElasticsearchClient esClient = ElasticsearchUtil.createClient();

@Override
public void onMessage(Hotel hotel) {
System.out.println("消费者收到消息:" + hotel);
// 构建索引请求
IndexRequest<Hotel> request = IndexRequest.of(b -> b
.index("hotels") // 索引名称
.id(hotel.getId().toString()) // 文档ID
.document(hotel) // 文档内容
);

// 执行索引请求
IndexResponse response = null;
try {
response = esClient.index(request);
} catch (IOException ex) {
throw new RuntimeException(ex);
}

System.out.println("文档索引成功,ID: " + response.id());
}
}

​ 效果展示:

image-20250528175237243

消费数据,存储到 ElasticSearch 中,并查找 Dev Tools 对应姓名的 ES 的数据的截图(查询负责人为自己,酒店名称中包含 “如家” 或 (分数 > 3 分 同时 价格 < 500) 的酒店)

​ ES 按条件查询搜索结果截图:

image-20250528175314798

​ (管理的都是高档酒店,需要价格大于500):

image-20250528180651299

​ ES文档总数的截图:

image-20250528180517747

​ ES 文档结构设计截图:

image-20250528180603877