Enriching Streams with Hive tables via Flink SQL

by datatabloid_difmmk

prologue

stream processing It’s about creating business value by applying logic to data in motion. Data sources are often combined to power data streams. Flink SQL does this, sending the results of the functions it applies to the data to the sink.business use cases such as fraud detection, Track ad impressions, enrich healthcare data, enrich financial spending information, enrich GPS device data, or personalize customer communications A great example of using hive tables to power data streams. Therefore, there are two common use cases for Hive tables with Flink SQL.

  1. Lookup tables for enriching data streams
  2. Sink for writing Flink results

There are also two ways to use Hive tables. any of these use cases. You can use either the Hive catalog or the Flink JDBC connector used in the Flink DDL. Let’s discuss how they work and their pros and cons.

Register Hive Catalog in SQL Stream Builder

SQL stream builder (SSB) was built to allow analysts to consume the power of Flink in a no-code interface. SSB is easy to register Hive Catalog:

  1. in the sidebar[データ プロバイダー]Click Menu.
  2. Click “Catalog registration” at the bottom
  3. Select “Hive” as catalog type
  4. Please name it
  5. Declare your default database
  6. Click “Verify”
  7. After successful validation, click Create

After the steps above, the Hive table will appear in the table list when you select it as the active catalog. Currently, due to the catalog concept, Flink only supports non-transactional Hive tables when accessed directly from HDFS for reading or writing.

Using Flink DDL with JDBC Connectors

The Flink JDBC connector allows you to create Flink tables for any Hive table directly from the console screen. This console screen allows you to use the Flink DDL creation scripts for your tables. This specifies the URL for the Hive DB and table name. All Hive tables can be accessed this way, regardless of type. JDBC DDL statements can also be generated via “templates”. Click “Template” -> “jdbc” and the console will paste the code into the editor.

CREATE TABLE `ItemCategory_transactional_jdbc_2` (

 `id` VARCHAR(2147483647),

 `category` VARCHAR(2147483647)

) WITH (

 ‘connector’ = ‘jdbc’,

 ‘lookup.cache.ttl’ = ‘10s’,

 ‘lookup.cache.max.rows’ = ‘10000’,

 ‘tablename’ = ‘item_category_transactional’,

 ‘url’ = ‘jdbc:hive2://<host>:<port>/default’

)

Using a Hive table as a lookup table

Hive tables are often used as lookup tables to enrich Flink streams. Flink can cache data found in Hive tables to improve performance. You need to tell Flink to join with the temporal table by setting the FOR SYSTEM_TIME AS OF clause.For more information Check the relevant Flink documentation.

SELECT t.itemId, i.category

FROM TransactionsTable t

LEFT JOIN ItemCategory_transactional_jdbc FOR SYSTEM_TIME AS OF t.event_time i ON i.id = t.itemId

Hive Catalog tables

For Hive catalog tables, the cached lookup table TTL (time to live) is configurable using the property “lookup.join.cache.ttl”. (This value defaults to 1 hour) A Hive table like this from Beeline or Hue:

Strong Points: No need to define DDL. A simple Hive catalog works.

Cons: Only works on non-transactional tables

Flink for DDL tables using the JDBC connector

No caching is the default when using Hive tables with the JDBC connector. in short, Flink accesses Hive for each entry that needs hardening. It can be changed by specifying two properties With the DDL command, lookup.cache.max-rows When lookup.cache.ttl.

Flink first looks up the cache and only sends a request to the external database if the cache is not found, updating the cache with the returned rows.The oldest line in the cache expires when the cache reaches max cache lines lookup.cache.max-rows or when a row exceeds the maximum lifetime lookup.cache.ttl. Cached rows may not be up-to-date. Some users may want to tune lookup.cache.ttl to update data more frequently, but this may increase the number of requests sent to the database. Users need to balance throughput and freshness of cached data.

CREATE TABLE `ItemCategory_transactional_jdbc_2` (

 `id` VARCHAR(2147483647),

 `category` VARCHAR(2147483647)

) WITH (

 ‘connector’ = `jdbc’,

 ‘lookup.cache.ttl’ = ‘10s’,

 ‘lookup.cache.max-rows’ = ‘10000’,

 ‘table-name’ = ‘item_category_transactional’,

 ‘url’ = ‘jdbc:hive2://<host>:<port>/default’

)

Pros: All Hive tables can be accessed this way, and the caching is more fine-tuned.

Be careful with cache parametersThis is how we ensure we are good joining Adjust this as needed to balance performance with new data from Hive.

Using a Hive table as a sink

Storing the output of a Flink job in a Hive table allows you to store the processed data for different needs. To do this, insert into Create a statement and write the results of the query to the specified Hive table. Note that you may need to adjust the JDBC sink job’s checkpoint timeout period using the Hive ACID table..

INSERT INTO ItemCategory_transactional_jdbc_2

SELECT t.itemId, i.category

FROM TransactionsTable t

LEFT JOIN ItemCategory_transactional_jdbc FOR SYSTEM_TIME AS OF t.event_time i ON i.id = t.itemId

Hive Catalog tables

No DDL needs to be written. Only non-transactional tables are supported, thus it only works with append-only streams.

Flink DDL tables with JDBC connector

With this option upsert type data can be written into transactional tables. In order to be able to do that a primary key should be defined.

CREATE TABLE `ItemCategory_transactional_jdbc_sink` (

 `id` STRING,

 `category` STRING,

 PRIMARY KEY (`id`) NOT ENFORCED

) WITH (

 ‘connector’ = ‘jdbc’,

 ‘table-name’ = ‘item_category_transactional_sink’,

 ‘url’ = ‘jdbc:hive2://<host>:<port>/default’

)

When this job runs, Flink will overwrite all records with the same primary key value if they already exist in the table. This works not only for upsert streams, but also for transactional Hive tables.

Conclusion

We’ve seen how to use SSB to enrich Flink’s data streams with Hive tables, and how to use Hive tables as sinks for Flink’s results. This is useful for many business use cases, including enriching data streams with lookup data. We dug deeper into different approaches using Hive tables. We also discussed the pros and cons of different approaches, as well as various cache-related options to improve performance. You can use this information to decide which approach is best for you.

If you really want to use SQL Stream Builder, be sure to download it. community edition today!

You may also like

Leave a Comment

About Us

We’re a provider of Data IT News and we focus to provide best Data IT News and Tutorials for all its users, we are free and provide tutorials for free. We promise to tell you what’s new in the parts of modern life Data professional and we will share lessons to improve knowledge in data science and data analysis field.