Apache Druid简介
Apache Druid是一个实时分析型数据库,旨在对大型数据集进行快速的查询分析(”OLAP”查询)。Druid最常被当做数据库来用以支持实时摄取、高性能查询和高稳定运行的应用场景,同时,Druid也通常被用来助力分析型应用的图形化界面,或者当做需要快速聚合的高并发后端API,Druid最适合应用于面向事件类型的数据。
数据查询
HTTP POST
在Druid SQL查询中,可以通过HTTP方式发送POST请求到 /druid/v2/sql 来执行SQL查询。该请求应该是一个带有 “query” 字段的JSON对象,例如: {“query” : “SELECT COUNT(*) FROM data_source WHERE foo = ‘bar’”}
JDBC
可以使用 Avatica JDBC Driver 来进行Druid SQL查询。
实现JDBC查询
添加依赖:
<dependency>
<groupId>org.apache.calcite.avatica</groupId>
<artifactId>avatica-core</artifactId>
<version>1.17.0</version>
</dependency>
编写Druid操作类:
@Component
public class DruidTemplate {
private static ThreadLocal<AvaticaConnection> threadLocal = new ThreadLocal<>();
@Value("${apache.druid.url}")
public void setUrl(String url) {
this.url = url;
}
private String url;
private AvaticaConnection connection() throws SQLException {
Properties properties = new Properties();
System.out.println("url = " + url);
AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(url, properties);
System.out.println("connection = " + connection);
threadLocal.set(connection);
return connection;
}
private void closeConnection() {
System.out.println("关闭线程:"+ threadLocal.get());
AvaticaConnection conn = threadLocal.get();
if(conn != null){
try {
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
threadLocal.remove();
}
}
public CachedRowSet executeQuery (String sql) {
CachedRowSet rowSet = null;
try {
AvaticaStatement statement = connection().createStatement();
ResultSet resultSet = statement.executeQuery(sql);
rowSet = new CachedRowSetImpl();
rowSet.populate(resultSet);
} catch (SQLException e) {
e.printStackTrace();
} finally {
closeConnection();
}
return rowSet;
}
}
注意:
很多情况我们使用ResultSet 就会因为这样那样的问题,rs被关闭或数据链接被关闭,导致ResultSet不能使用。其实这个问题我们可以用CachedRowSetImpl来解决。
配置文件添加配置:
apache.druid.url=jdbc:avatica:remote:url=http://192.168.10.34:8888/druid/v2/sql/avatica/
使用DruidTemplate
:
@Repository
public class DruidMapperImpl implements DruidMapper {
@Autowired
private DruidTemplate druidTemplate;
@Override
public List<WaterDO> listData() {
String sql = "SELECT \"__time\", \"code\", \"location\", \"water_level\"\n" +
"FROM \"test_water_level_5\"\n" +
"WHERE \"__time\" >= CURRENT_TIMESTAMP - INTERVAL '10' DAY limit ?";
try {
List<WaterDO> waterDOList = new ArrayList<>();
CachedRowSet rowSet = druidTemplate.executeQuery(sql);
while (rowSet.next()) {
WaterDO waterDO = new WaterDO();
waterDO.setLocation(rowSet.getString("location"));
waterDO.setCode(rowSet.getInt("code"));
waterDO.setWaterLevel(rowSet.getInt("water_level"));
waterDO.setTime(rowSet.getTimestamp("__time"));
waterDOList.add(waterDO);
}
return waterDOList;
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}