Spring Boot集成Apache Druid

Posted by Kaka Blog on December 28, 2020

Apache Druid简介

Apache Druid是一个实时分析型数据库,旨在对大型数据集进行快速的查询分析(”OLAP”查询)。Druid最常被当做数据库来用以支持实时摄取、高性能查询和高稳定运行的应用场景,同时,Druid也通常被用来助力分析型应用的图形化界面,或者当做需要快速聚合的高并发后端API,Druid最适合应用于面向事件类型的数据。

数据查询

HTTP POST

在Druid SQL查询中,可以通过HTTP方式发送POST请求到 /druid/v2/sql 来执行SQL查询。该请求应该是一个带有 “query” 字段的JSON对象,例如: {“query” : “SELECT COUNT(*) FROM data_source WHERE foo = ‘bar’”}

JDBC

可以使用 Avatica JDBC Driver 来进行Druid SQL查询。

实现JDBC查询

添加依赖:

<dependency>
    <groupId>org.apache.calcite.avatica</groupId>
    <artifactId>avatica-core</artifactId>
    <version>1.17.0</version>
</dependency>

编写Druid操作类:

@Component
public class DruidTemplate {
    private static ThreadLocal<AvaticaConnection> threadLocal = new ThreadLocal<>();

    @Value("${apache.druid.url}")
    public void setUrl(String url) {
        this.url = url;
    }

    private String url;

    private AvaticaConnection connection() throws SQLException {
        Properties properties = new Properties();
        System.out.println("url = " + url);
        AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(url, properties);
        System.out.println("connection = " + connection);
        threadLocal.set(connection);
        return connection;
    }

    private void closeConnection() {
        System.out.println("关闭线程:"+ threadLocal.get());
        AvaticaConnection conn = threadLocal.get();
        if(conn != null){
            try {
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
            threadLocal.remove();
        }
    }

    public CachedRowSet executeQuery (String sql) {
        CachedRowSet rowSet = null;
        try {
            AvaticaStatement statement = connection().createStatement();
            ResultSet resultSet = statement.executeQuery(sql);
            rowSet = new CachedRowSetImpl();
            rowSet.populate(resultSet);
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            closeConnection();
        }
        return rowSet;
    }
}

注意:

很多情况我们使用ResultSet 就会因为这样那样的问题,rs被关闭或数据链接被关闭,导致ResultSet不能使用。其实这个问题我们可以用CachedRowSetImpl来解决。

配置文件添加配置:

apache.druid.url=jdbc:avatica:remote:url=http://192.168.10.34:8888/druid/v2/sql/avatica/

使用DruidTemplate

@Repository
public class DruidMapperImpl implements DruidMapper {
    @Autowired
    private DruidTemplate druidTemplate;
    @Override
    public List<WaterDO> listData() {
        String sql = "SELECT \"__time\", \"code\", \"location\", \"water_level\"\n" +
                "FROM \"test_water_level_5\"\n" +
                "WHERE \"__time\" >= CURRENT_TIMESTAMP - INTERVAL '10' DAY limit ?";
        try {
            List<WaterDO> waterDOList = new ArrayList<>();
            CachedRowSet rowSet = druidTemplate.executeQuery(sql);
            while (rowSet.next()) {
                WaterDO waterDO = new WaterDO();
                waterDO.setLocation(rowSet.getString("location"));
                waterDO.setCode(rowSet.getInt("code"));
                waterDO.setWaterLevel(rowSet.getInt("water_level"));
                waterDO.setTime(rowSet.getTimestamp("__time"));
                waterDOList.add(waterDO);
            }
            return waterDOList;
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}