当前位置: 代码迷 >> 综合 >> Kudu之Scala版本API
  详细解决方案

Kudu之Scala版本API

热度:49   发布时间:2023-09-14 15:24:15.0

建表:

// 创建kudu连接
val kuduClient = new KuduClient.KuduClientBuilder("172.20.85.29:7051").build()// 设置表名
val tableName = "kudu_test"// 创建列
val colums = List[ColumnSchema]((new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).key(true).nullable(false).build()),(new ColumnSchema.ColumnSchemaBuilder("age", Type.INT64).nullable(true).build()),(new ColumnSchema.ColumnSchemaBuilder("city", Type.STRING).nullable(true).build()))
val schema: Schema = new Schema(colums.asJava)// 设置hash分区
val cto: CreateTableOptions = new CreateTableOptions()
cto.setRangePartitionColumns(List("name").asJava).setNumReplicas(3)// 执行建表语句
kuduClient.createTable(tableName, schema, cto)// 关闭kudu连接
kuduClient.close()

kuduClient API 对表插入数据

// 创建kudu连接
val kuduClient = new KuduClient.KuduClientBuilder("172.20.85.29:7051").build()// 设置表名  
val tableName = "kudu_test"// 获得表的连接
val kuduTable = kuduClient.openTable(tableName)// 开启一个会话
val  session = kuduClient.newSession()
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH)// 创建插入对象并设置插入数据
val insert = kuduTable.newInsert()
val row = insert.getRow()
row.addString(0, "Ronnie")
row.addInt(1, 21)
row.addString(2, "beijing")//执行插入语句
session.apply(insert)// 同步数据,关闭会话
session.flush()
session.close()

kuduClient API 对表修改数据

// 创建kudu连接
val kuduClient = new KuduClient.KuduClientBuilder("172.20.85.29:7051").build()// 设置表名  
val tableName = "kudu_test"// 获得表的连接
val kuduTable = kuduClient.openTable(tableName)// 开启一个会话
val  session = kuduClient.newSession()
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH)// 创建updata对象
val update = kuduTable.newUpdate()
val rowUpdata = update.getRow()
rowUpdata.addString("name", "nnnn")
rowUpdata.addInt("age", 22)
rowUpdata.addString("city", "ddddd")// 同步数据并关闭会话    
session.apply(update)
session.flush()// 关闭kudu连接
kuduClient.close()

kuduClient API 对表执行upsert操作

如果该主键存在则执行updata操作,即修改表数据;如果该主键不存在,则执行insert操作,即插入数据。

// 创建kudu连接
val kuduClient = new KuduClient.KuduClientBuilder("172.20.85.29:7051").build()// 设置表名  
val tableName = "kudu_test"// 获得表的连接
val kuduTable = kuduClient.openTable(tableName)// 开启一个会话
val  session = kuduClient.newSession()
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH)// 创建upsert对象
val upsert = kuduTable.newUpsert()
val rowUpsert = upsert.getRow()
rowUpsert.addString("name", "nnnn")
rowUpsert.addInt("age", 19)
rowUpsert.addString("city", "mmmm")// 执行upsert操作
session.apply(upsert)// 同步数据并关闭会话
session.flush()
session.close()// 关闭kudu连接
kuduClient.close()// 关闭kudu连接
kuduClient.close()

kuduClient API 对表执行删除数据操作

// 创建kudu连接
val kuduClient = new KuduClient.KuduClientBuilder("172.20.85.29:7051").build()// 设置表名  
val tableName = "kudu_test"// 获得表的连接
val kuduTable = kuduClient.openTable(tableName)// 开启一个会话
val  session = kuduClient.newSession()
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH)// 创建删除对象并指定要删除的行
val delete = kuduTable.newDelete()
delete.getRow().addString("name", "Ronnie")// 执行删除操作
session.apply(delete)// 同步数据并关闭会话
session.flush()
session.close()// 关闭kudu连接
kuduClient.close()