KUDU JAVA API

    技术2022-07-10  115

    maven

    <dependency> <groupId>org.apache.kudu</groupId> <artifactId>kudu-client</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <!-- <scope>test</scope>--> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <scope>compile</scope> </dependency>

    代码

    import org.apache.kudu.ColumnSchema; import org.apache.kudu.Schema; import org.apache.kudu.Type; import org.apache.kudu.client.*; import org.junit.After; import org.junit.Before; import org.junit.Test; import java.util.ArrayList; import java.util.LinkedList; public class TestKudu { //声明全局变量KuduClient后期通过它来操作kudu表 private KuduClient kuduClient; //指定kuduMaster地址 private String kuduMaster; //指定表名 private String tableName; @Before public void init(){ //初始化操作 kuduMaster = "hadoop1:7051,hadoop2:7051,hadoop3:7051"; //指定表名 tableName = "student"; KuduClient.KuduClientBuilder kuduClientBuilder = new KuduClient.KuduClientBuilder(kuduMaster); kuduClientBuilder.defaultSocketReadTimeoutMs(1000000); kuduClient = kuduClientBuilder.build(); } //创建表 @Test public void createTable() throws KuduException { //判断表是否存在,不存在就构建 if(!kuduClient.tableExists(tableName)){ //构建创建表的schema信息 --- 就是表的字段和类型 ArrayList<ColumnSchema> columnSchemas = new ArrayList<>(); columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.INT32).key(true).build()); columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("name",Type.STRING).build()); columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("age",Type.INT32).build()); columnSchemas.add(new ColumnSchema.ColumnSchemaBuilder("sex",Type.INT32).build()); Schema schema = new Schema(columnSchemas); //指定创建表的相关属性 CreateTableOptions options = new CreateTableOptions(); ArrayList<String> partitionList = new ArrayList<>(); //指定kudu表的分区字段是什么 partitionList.add("id"); //按照 id.hashcode % 分区数 = 分区号 options.addHashPartitions(partitionList,6); kuduClient.createTable(tableName,schema,options); } } /** * 插入数据 */ @Test public void insertTable() throws KuduException { //向表加载数据需要一个kuduSession对象 KuduSession kuduSession = kuduClient.newSession(); kuduSession.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC); //需要使用kuduTable来构建Operation的子类实例对象 KuduTable kuduTable = kuduClient.openTable(tableName); for(int i = 1;i<= 10;i++){ Insert insert = kuduTable.newInsert(); PartialRow row = insert.getRow(); row.addInt("id",i); row.addString("name","zhangsan-" + i); row.addInt("age",20 + i); row.addInt("sex",i%2); //最后时限执行数据的加载操作 kuduSession.apply(insert); } }

     

    Processed: 0.010, SQL: 9