HBase connector
A flow and a composite sink to write element in HBase.
HBase is a column family NoSQL Database backed by HDFS.
Usage
Build a converter and a tableSetting.
Converter will map the domain object to HBase column.
- scala
-
val hBaseConverter: Person => Put = { person => val put = new Put(s"id_${person.id}") put.addColumn("info", "name", person.name) put }
- java
-
Function<Person, Put> hBaseConverter = person -> { Put put = null; try { put = new Put(String.format("id_%d", person.id).getBytes("UTF-8")); put.addColumn("info".getBytes("UTF-8"), "name".getBytes("UTF-8"), person.name.getBytes("UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return put; };
Table will be created on demand.
- scala
-
val tableSettings = HTableSettings(HBaseConfiguration.create(), TableName.valueOf("person"), immutable.Seq("info"), hBaseConverter)
- java
-
HTableSettings<Person> tableSettings = HTableSettings.create(HBaseConfiguration.create(), TableName.valueOf("person1"), Arrays.asList("info"), hBaseConverter);
Flow usage
- scala
-
val flow = HTableStage.flow[Person](tableSettings) val f = Source(11 to 20).map(i => Person(i, s"zozo_$i")).via(flow).runWith(Sink.fold(0)((a, d) => a + d.id))
- java
-
Flow<Person, Person, NotUsed> flow = HTableStage.flow(tableSettings); Pair<NotUsed, CompletionStage<List<Person>>> run = Source.from(Arrays.asList(200, 201, 202, 203, 204)).map((i) -> new Person(i, String.format("name_%d", i))).via(flow).toMat(Sink.seq(), Keep.both()).run(materializer);
Sink usage
- scala
-
val sink = HTableStage.sink[Person](tableSettings) val f = Source(1 to 10).map(i => Person(i, s"zozo_$i")).runWith(sink)
- java
-
final Sink<Person, scala.concurrent.Future<Done>> sink = HTableStage.sink(tableSettings); Future<Done> o = Source.from(Arrays.asList(100, 101, 102, 103, 104)).map((i) -> new Person(i, String.format("name %d", i))).runWith(sink, materializer);
HBase basic command:
$HBASE_HOME/bin/start-hbase.sh
$HBASE_HOME/bin/ shell
From the hbase shell:
list //list table
scan "person" // select * from person
disable "person" //Disable table "person", before drop
drop "person"
The source code for this page can be found here.