HiveQL: Views:
Views to Reduce Query Complexity
When a query becomes long or complicated, a view may be used to hide the complexity
by dividing the query into smaller, more manageable pieces; similar to writing a function
in a programming language or the concept of layered design in software. Encapsulating
the complexity makes it easier for end users to construct complex queries from
reusable parts. For example, consider the following query with a nested subquery:
FROM (
SELECT * FROM people JOIN cart
ON (cart.people_id=people.id) WHERE firstname='john'
) a SELECT a.lastname WHERE a.id=3;
It is common for Hive queries to have many levels of nesting. In the following example,
the nested portion of the query is turned into a view:
CREATE VIEW shorter_join AS
SELECT * FROM people JOIN cart
ON (cart.people_id=people.id) WHERE firstname='john';
Now the view is used like any other table. In this query we added a WHERE clause to the
SELECT statement. This exactly emulates the original query:
SELECT lastname FROM shorter_join WHERE id=3;
Views and Map Type for Dynamic Tables:
Hive supports arrays, maps, and structs datatypes. These
datatypes are not common in traditional databases as they break first normal form.
Hive’s ability to treat a line of text as a map, rather than a fixed set of columns, combined
with the view feature, allows you to define multiple logical tables over one physical table
CREATE EXTERNAL TABLE dynamictable(cols map<string,string>)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\004'
COLLECTION ITEMS TERMINATED BY '\001'
MAP KEYS TERMINATED BY '\002'
STORED AS TEXTFILE;
CREATE VIEW orders(state, city, part) AS
SELECT cols["state"], cols["city"], cols["part"]
FROM dynamictable
WHERE cols["type"] = "request";
A second view is created named shipments. This view returns the time and part column
from rows where the type is response:
CREATE VIEW shipments(time, part) AS
SELECT cols["time"], cols["parts"]
FROM dynamictable
WHERE cols["type"] = "response";
HiveQL: Indexes:
Creating an Index
Let’s create an index for our managed, partitioned employees table we described in
“Partitioned, Managed Tables” on page 58. Here is the table definition we used previously,
for reference:
CREATE TABLE employees (
name STRING,
salary FLOAT,
subordinates ARRAY<STRING>,
deductions MAP<STRING, FLOAT>,
address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
PARTITIONED BY (country STRING, state STRING);
CREATE INDEX employees_index
ON TABLE employees (country)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
WITH DEFERRED REBUILD
IDXPROPERTIES ('creator = 'me', 'created_at' = 'some_time')
IN TABLE employees_index_table
PARTITIONED BY (country, name)
COMMENT 'Employees indexed by country and name.';
Bitmap Indexes:
Bitmap indexes are commonly used
for columns with few distinct values. Here is our previous example rewritten to use the
bitmap index handler:
CREATE INDEX employees_index
ON TABLE employees (country)
AS 'BITMAP'
WITH DEFERRED REBUILD
IDXPROPERTIES ('creator = 'me', 'created_at' = 'some_time')
IN TABLE employees_index_table
PARTITIONED BY (country, name)
COMMENT 'Employees indexed by country and name.';
Rebuilding the Index:
If you specified WITH DEFERRED REBUILD, the new index starts empty. At any time, the
index can be built the first time or rebuilt using the ALTER INDEX statement:
ALTER INDEX employees_index
ON TABLE employees
PARTITION (country = 'US')
REBUILD;
If the PARTITION clause is omitted, the index is rebuilt for all partitions.
If the PARTITION clause is omitted, the index is rebuilt for all partitions.
There is no built-in mechanism to trigger an automatic rebuild of the index if the underlying
table or a particular partition changes. However, if you have a workflow that
updates table partitions with data, one where you might already use the ALTER TABLE ...
TOUCH PARTITION(...)
Showing an Index
The following command will show all the indexes defined for any column in the indexed
table:
SHOW FORMATTED INDEX ON employees;
FORMATTED is optional. It causes column titles to be added to the output. You can also
replace INDEX with INDEXES, as the output may list multiple indexes.
Dropping an Index:
Dropping an Index
Dropping an index also drops the index table, if any:
DROP INDEX IF EXISTS employees_index ON TABLE employees;
Hive won’t let you attempt to drop the index table directly with DROP TABLE. As always,
IF EXISTS is optional and serves to suppress errors if the index doesn’t exist.
If the table that was indexed is dropped, the index itself and its table is dropped. Similarly,
if a partition of the original table is dropped, the corresponding partition index
is also dropped.
Schema Design:
hive> CREATE TABLE supply_2011_01_02 (id int, part string, quantity int);
hive> CREATE TABLE supply_2011_01_03 (id int, part string, quantity int);
hive> CREATE TABLE supply_2011_01_04 (id int, part string, quantity int);
hive> .... load data ...
hive> SELECT part,quantity supply_2011_01_02
> UNION ALL
> SELECT part,quantity from supply_2011_01_03
> WHERE quantity < 4;
With Hive, a partitioned table should be used instead. Hive uses expressions in the
WHERE clause to select input only from the partitions needed for the query. This query
will run efficiently, and it is clean and easy on the eyes:
hive> CREATE TABLE supply (id int, part string, quantity int)
> PARTITIONED BY (int day);
hive> ALTER TABLE supply add PARTITION (day=20110102);
hive> ALTER TABLE supply add PARTITION (day=20110103);
hive> ALTER TABLE supply add PARTITION (day=20110102);
hive> .... load data ...
hive> SELECT part,quantity FROM supply
> WHERE day>=20110102 AND day<20110103 AND quantity < 4;
Over Partitioning:
However, a design that creates too many partitions may optimize some
queries, but be detrimental for other important queries:
hive> CREATE TABLE weblogs (url string, time long )
> PARTITIONED BY (day int, state string, city string);
hive> SELECT * FROM weblogs WHERE day=20110102;
HDFS was designed for many millions of large files, not billions of small files. The first
drawback of having too many partitions is the large number of Hadoop files and directories
that are created unnecessarily.
Each partition corresponds to a directory that
usually contains multiple files. If a given table contains thousands of partitions, it may
have tens of thousands of files, possibly created every day. If the retention of this table
is multiplied over years, it will eventually exhaust the capacity of the NameNode to
manage the filesystem metadata. The NameNode must keep all metadata for the filesystem
in memory. While each file requires a small number of bytes for its metadata
(approximately 150 bytes/file), the net effect is to impose an upper limit on the total
number of files that can be managed in an HDFS installation. Other filesystems, like
MapR and Amazon S3 don’t have this limitation
appropriate, especially if query WHERE clauses typically select ranges of smaller
granularities:
hive> CREATE TABLE weblogs (url string, time long, state string, city string )
> PARTITIONED BY (day int);
hive> SELECT * FROM weblogs WHERE day=20110102;
Another solution is to use two levels of partitions along different dimensions. For example,
the first partition might be by day and the second-level partition might be by
geographic region, like the state:
hive> CREATE TABLE weblogs (url string, time long, city string )
> PARTITIONED BY (day int, state string);
hive> SELECT * FROM weblogs WHERE day=20110102;
However, since some states will probably result in lots more data than others, you could
see imbalanced map tasks, as processing the larger states takes a lot longer than processing
the smaller states.
Unique Keys and Normalization:
Hive, however, does not have the
concept of primary keys or automatic, sequence-based key generation. Joins should be
avoided in favor of denormalized data, when feasible. The complex types, Array, Map,
and Struct, help by allowing the storage of one-to-many data inside a single row. This
is not to say normalization should never be utilized, but star-schema type designs are
nonoptimal.
The primary reason to avoid normalization is to minimize disk seeks, such as those
typically required to navigate foreign key relations. Denormalizing data permits it to
be scanned from or written to large, contiguous sections of disk drives, which optimizes
I/O performance. However, you pay the penalty of denormalization, data duplication
and the greater risk of inconsistent data.
Making Multiple Passes over the Same Data:
For example, each of the following two queries creates a table from the same source
table, history:
hive> INSERT OVERWRITE TABLE sales
> SELECT * FROM history WHERE action='purchased';
hive> INSERT OVERWRITE TABLE credits
> SELECT * FROM history WHERE action='returned';
hive> FROM history
> INSERT OVERWRITE sales SELECT * WHERE action='purchased'
> INSERT OVERWRITE credits SELECT * WHERE action='returned';
The Case for Partitioning Every Table:
Many ETL processes involve multiple processing steps. Each step may produce one or
more temporary tables that are only needed until the end of the next job. At first it may
pear that partitioning these temporary tables is unnecessary. However, imagine a
scenario where a mistake in step’s query or raw data forces a rerun of the ETL process
for several days of input. You will likely need to run the catch-up process a day at a
time in order to make sure that one job does not overwrite the temporary table before
other tasks have completed.
For example, this following design creates an intermediate table by the name
of distinct_ip_in_logs to be used by a subsequent processing step:
$ hive -hiveconf dt=2011-01-01
hive> INSERT OVERWRITE table distinct_ip_in_logs
> SELECT distinct(ip) as ip from weblogs
> WHERE hit_date='${hiveconf:dt}';
hive> CREATE TABLE state_city_for_day (state string,city string);
hive> INSERT OVERWRITE state_city_for_day
> SELECT distinct(state,city) FROM distinct_ip_in_logs
> JOIN geodata ON (distinct_ip_in_logs.ip=geodata.ip);
This approach works, however computing a single day causes the record of the previous
day to be removed via the INSERT OVERWRITE clause. If two instances of this process are
run at once for different days they could stomp on each others’ results.
A more robust approach is to carry the partition information all the way through the
process. This makes synchronization a nonissue. Also, as a side effect, this approach
allows you to compare the intermediate data day over day:
$ hive -hiveconf dt=2011-01-01
hive> INSERT OVERWRITE table distinct_ip_in_logs
> PARTITION (hit_date=${dt})
> SELECT distinct(ip) as ip from weblogs
> WHERE hit_date='${hiveconf:dt}';
hive> CREATE TABLE state_city_for_day (state string,city string)
> PARTITIONED BY (hit_date string);
hive> INSERT OVERWRITE table state_city_for_day PARTITION(${hiveconf:df})
> SELECT distinct(state,city) FROM distinct_ip_in_logs
> JOIN geodata ON (distinct_ip_in_logs.ip=geodata.ip)
> WHERE (hit_date='${hiveconf:dt}');
A drawback of this approach is that you will need to manage the intermediate table
and delete older partitions, but these tasks are easy to automate.
Bucketing Table Data Storage
Bucketing is another technique for decomposing data sets into more manageable parts.
So, the following commands might fail:
hive> CREATE TABLE weblog (url STRING, source_ip STRING)
> PARTITIONED BY (dt STRING, user_id INT);
hive> FROM raw_weblog
> INSERT OVERWRITE TABLE page_view PARTITION(dt='2012-06-08', user_id)
> SELECT server_name, url, source_ip, dt, user_id;
Instead, if we bucket the weblog table and use user_id as the bucketing column, the
value of this column will be hashed by a user-defined number into buckets. Records
with the same user_id will always be stored in the same bucket. Assuming the number
of users is much greater than the number of buckets, each bucket will have many users:
hive> CREATE TABLE weblog (user_id INT, url STRING, source_ip STRING)
> PARTITIONED BY (dt STRING)
> CLUSTERED BY (user_id) INTO 96 BUCKETS;
If we didn’t use the hive.enforce.bucketing property, we would have to set the number
of reducers to match the number of buckets, using set mapred.reduce.tasks=96. Then
the INSERT query would require a CLUSTER BY clause after the SELECT clause.
Adding Columns to a Table:
example, if a row has fewer columns than expected, the missing
columns will be returned as null. If the row has more columns than expected, they will
be ignored. Adding new columns to the schema involves a single ALTER TABLE ADD COL
UMN command. This is very useful as log formats tend to only add more information to
a message:
hive> CREATE TABLE weblogs (version LONG, url STRING)
> PARTITIONED BY (hit_date int)
> ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
hive> ! cat log1.txt
1 /mystuff
1 /toys
hive> LOAD DATA LOCAL INPATH 'log1.txt' int weblogs partition(20110101);
hive> SELECT * FROM weblogs;
1 /mystuff 20110101
1 /toys 20110101
Over time a new column may be added to the underlying data. In the following example
the column user_id is added to the data. Note that some older raw data files may not
have this column:
hive> ! cat log2.txt
2 /cars bob
2 /stuff terry
hive> ALTER TABLE weblogs ADD COLUMNS (user_id string);
hive> LOAD DATA LOCAL INPATH 'log2.txt' int weblogs partition(20110102);
hive> SELECT * from weblogs
1 /mystuff 20110101 NULL
1 /toys 20110101 NULL
2 /cars 20110102 bob
2 /stuff 20110102 terry
(Almost) Always Use Compression!
almost all cases, compression makes data smaller on disk, which usually makes
queries faster by reducing I/O overhead. Hive works seamlessly with many compression
types. The only compelling reason to not use compression is when the data produced.
is intended for use by an external system, and an uncompressed format, such as text,
is the most compatible.
But compression and decompression consumes CPU resources. MapReduce jobs tend
to be I/O bound, so the extra CPU overhead is usually not a problem. However, for
workflows that are CPU intensive, such as some machine-learning algorithms, compression
may actually reduce performance by stealing valuable CPU resources from
more essential operations.
Tuning:
Using EXPLAIN:
hive> SELECT SUM(number) FROM onecol;
14
Now, put the EXPLAIN keyword in front of the last query to see the query plan and other
information. The query will not be executed.
hive> EXPLAIN SELECT SUM(number) FROM onecol;
The output requires some explaining and practice to understand.
First, the abstract syntax tree is printed. This shows how Hive parsed the query into
tokens and literals, as part of the first step in turning the query into the ultimate result:
ABSTRACT SYNTAX TREE:
(TOK_QUERY
(TOK_FROM (TOK_TABREF (TOK_TABNAME onecol)))
(TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE))
(TOK_SELECT
(TOK_SELEXPR
(TOK_FUNCTION sum (TOK_TABLE_OR_COL number))))))
Even though our query will write its output to the console, Hive will actually write the
output to a temporary file first, as shown by this part of the output:
'(TOK_INSERT (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE))'
Hive has a configuration property to enable sampling of source data for use
with LIMIT:
<property>
<name>hive.limit.optimize.enable</name>
<value>true</value>
<description>Whether to enable to optimization to
try a smaller subset of data for simple LIMIT first.</description>
</property>
<property>
<name>hive.limit.row.max.size</name>
<value>100000</value>
<description>When trying a smaller subset of data for simple LIMIT,
how much size we need to guarantee each row to have at least.
</description>
</property>
<property>
<name>hive.limit.optimize.limit.file</name>
<value>10</value>
<description>When trying a smaller subset of data for simple LIMIT,
maximum number of files we can sample.</description>
</property>
You can explicitly enable local mode temporarily, as in this example:
hive> set oldjobtracker=${hiveconf:mapred.job.tracker};
hive> set mapred.job.tracker=local;
hive> set mapred.tmp.dir=/home/edward/tmp;
hive> SELECT * from people WHERE firstname=bob;
...
hive> set mapred.job.tracker=${oldjobtracker};
To set this property permanently for all users, change the value in your $HIVE_HOME/
conf/hive-site.xml:
<property>
<name>hive.exec.mode.local.auto</name>
<value>true</value>
<description>
Let hive determine whether to run in local mode automatically
</description>
</property>
Parallel Execution:
Setting hive.exec.parallel to true enables parallel execution. Be careful in a shared
cluster, however. If a job is running more stages in parallel, it will increase its cluster
utilization:
<property>
<name>hive.exec.parallel</name>
<value>true</value>
<description>Whether to execute jobs in parallel</description>
</property>
Strict Mode:
Setting the property hive.mapred.mode to strict disables three types of queries.
First, queries on partitioned tables are not permitted unless they include a partition
filter in the WHERE clause, limiting their scope. In other words, you’re prevented from
queries that will scan all partitions.
The second type of restricted query are those with ORDER BY clauses, but no LIMIT clause.
Because ORDER BY sends all results to a single reducer to perform the ordering, forcing
the user to specify a LIMIT clause prevents the reducer from executing for an extended
period of time:
The third and final type of query prevented is a Cartesian product. Users coming from
the relational database world may expect that queries that perform a JOIN not with an
ON clause but with a WHERE clause will have the query optimized by the query planner,
effectively converting the WHERE clause into an ON clause. Unfortunately, Hive does not
perform this optimization, so a runaway query will occur if the tables are large:
Tuning the Number of Mappers and Reducers:
A balance is required. Having too many mapper or reducer tasks causes excessive overhead
in starting, scheduling, and running the job, while too few tasks means the
inherent parallelism of the cluster is underutilized
GROUP BY
query, because they always require a reduce phase. In contrast, many other queries are
converted into map-only jobs:
(We’ve reformatted the output and elided some details for space.)
The default value of hive.exec.reducers.bytes.per.reducer is 1 GB. Changing this
value to 750 MB causes Hive to estimate four reducers for this job:
hive> set hive.exec.reducers.bytes.per.reducer=750000000;
JVM Reuse:
mapredsite.xml (in $HADOOP_HOME/conf):
<property>
<name>mapred.job.reuse.jvm.num.tasks</name>
<value>10</value>
<description>How many tasks to run per jvm. If set to -1, there is no limit.
</description>
</property>
A drawback of this feature is that JVM reuse will keep reserved task slots open until
the job completes, in case they are needed for reuse. If an “unbalanced” job has some
reduce tasks that run considerably longer than the others, the reserved slots will sit idle,
unavailable for other jobs, until the last task completes.
Indexes
Indexes may be used to accelerate the calculation speed of a GROUP BY query.Dynamic Partition Tuning:
dynamic partition INSERT
statements enable a succinct SELECT statement to create many new partitions for insertion
into a partitioned table.
strict in your hivesite.
xml, as discussed in “Strict Mode When strict mode is on, at least one partition has to be static.
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>strict</value>
<description>In strict mode, the user must specify at least one
static partition in case the user accidentally overwrites all
partitions.</description>
</property>
Then, increase the other relevant properties to allow queries that will create a large
number of dynamic partitions, for example:
<property>
<name>hive.exec.max.dynamic.partitions</name>
<value>300000</value>
<description>Maximum number of dynamic partitions allowed to be
created in total.</description>
</property>
<property>
<name>hive.exec.max.dynamic.partitions.pernode</name>
<value>10000</value>
<description>Maximum number of dynamic partitions allowed to be
created in each mapper/reducer node.</description>
</property>
Speculative Execution:
Speculative execution is a feature of Hadoop that launches a certain number of duplicate
tasks. While this consumes more resources computing duplicate copies of data
that may be discarded, the goal of this feature is to improve overall job progress by
getting individual task results faster, and detecting then black-listing slow-running
TaskTrackers.
$HADOOP_HOME/conf/mapredsite.
xml file by the following two variables:
<property>
<name>mapred.map.tasks.speculative.execution</name>
<value>true</value>
<description>If true, then multiple instances of some map tasks
may be executed in parallel.</description>
</property>
<property>
<name>mapred.reduce.tasks.speculative.execution</name>
<value>true</value>
<description>If true, then multiple instances of some reduce tasks
may be executed in parallel.</description>
</property>
However, Hive provides its own variable to control reduce-side speculative execution:
<property>
<name>hive.mapred.reduce.tasks.speculative.execution</name>
<value>true</value>
<description>Whether speculative execution for
reducers should be turned on. </description>
</property>
Single MapReduce MultiGROUP BY
Another special optimization attempts to combine multiple GROUP BY operations in a
query into a single MapReduce job. For this optimization to work, a common set of
GROUP BY keys is required:
<property>
<name>hive.multigroupby.singlemr</name>
<value>false</value>
<description>Whether to optimize multi group by query to generate single M/R
job plan. If the multi group by query has common group by keys, it will be
optimized to generate single M/R job.</description>
</property>
Other File Formats and Compression:
Hive leverages Hadoop’s InputFormat APIs to read data from a variety
of sources, such as text files, sequence files, or even custom formats. Likewise, the
OutputFormat API is used to write data to various formats
Determining Installed Codecs:
# hive -e "set io.compression.codecs"
io.compression.codecs=org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec
Choosing a Compression Codec:
Using compression has the advantage of minimizing the disk space required for files
and the overhead of disk and network I/O.
Therefore, compression is best used for I/O-bound jobs, where there is extra CPU capacity, or when disk space is at a premium.
Enabling Intermediate Compression:
<property>
<name>hive.exec.compress.intermediate</name>
<value>true</value>
<description> This controls whether intermediate files produced by Hive between
multiple map-reduce jobs are compressed. The compression codec and other options
are determined from hadoop config variables mapred.output.compress* </description>
</property>
Hadoop compression has a DefaultCodec. Changing the codec involves setting the
mapred.map.output.compression.codec property. This is a Hadoop variable and can be
set in the $HADOOP_HOME/conf/mapred-site.xml or the $HADOOP_HOME/conf/
hive-site.xml. SnappyCodec is a good choice for intermediate compression because it
combines good compression performance with low CPU cost:
<property>
<name>mapred.map.output.compression.codec</name>
<value>org.apache.hadoop.io.compress.SnappyCodec</value>
<description> This controls whether intermediate files produced by Hive
between multiple map-reduce jobs are compressed. The compression codec
and other options are determined from hadoop config variables
mapred.output.compress* </description>
</property>
Final Output Compression:
Users can turn on final compression by setting the property to true on a
query-by-query basis or in their scripts:
<property>
<name>hive.exec.compress.output</name>
<value>false</value>
<description> This controls whether the final outputs of a query
(to a local/hdfs file or a Hive table) is compressed. The compression
codec and other options are determined from hadoop config variables
mapred.output.compress* </description>
</property>
If hive.exec.compress.output is set true, a codec can be chosen. GZip compression is
a good choice for output compression because it typically reduces the size of files significantly,
but remember that GZipped files aren’t splittable by subsequent MapReduce
jobs:
<property>
<name>mapred.output.compression.codec</name>
<value>org.apache.hadoop.io.compress.GzipCodec</value>
<description>If the job outputs are compressed, how should they be compressed?
</description>
</property>
Sequence Files:
The sequence file format supported by Hadoop breaks a file into blocks and then optionally
compresses the blocks in a splittable way.
To use sequence files from Hive, add the STORED AS SEQUENCEFILE clause to a CREATE
TABLE statement:
CREATE TABLE a_sequence_file_table STORED AS SEQUENCEFILE;
Hive’s hivesite.
xml, or as needed in scripts or before individual queries:
<property>
<name>mapred.output.compression.type</name>
<value>BLOCK</value>
<description>If the job outputs are to compressed as SequenceFiles,
how should they be compressed? Should be one of NONE, RECORD or BLOCK.
</description>
</property>
Compression in Action
First, let’s enable intermediate compression. This won’t affect the final output, however
the job counters will show less physical data transferred for the job, since the shuffle
sort data was compressed:
We can also chose an intermediate compression codec other then the default codec. In
this case we chose GZIP, although Snappy is normally a better option. The first line is
wrapped for space:
hive> set mapred.map.output.compression.codec
=org.apache.hadoop.io.compress.GZipCodec;
hive> set hive.exec.compress.intermediate=true;
hive> CREATE TABLE intermediate_comp_on_gz
> ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
> AS SELECT * FROM a;
Moving data to: file:/user/hive/warehouse/intermediate_comp_on_gz
Table default.intermediate_comp_on_gz stats:
[num_partitions: 0, num_files: 1, num_rows: 2, total_size: 8, raw_data_size: 6]
hive> dfs -cat /user/hive/warehouse/intermediate_comp_on_gz/000000_0;
4 5
3 2
Next, we can enable output compression:
hive> set hive.exec.compress.output=true;
hive> CREATE TABLE final_comp_on
> ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
> AS SELECT * FROM a;
Moving data to: file:/tmp/hive-edward/hive_2012-01-15_11-11-01_884_.../-ext-10001
Moving data to: file:/user/hive/warehouse/final_comp_on
Table default.final_comp_on stats:
[num_partitions: 0, num_files: 1, num_rows: 2, total_size: 16, raw_data_size: 6]
hive> dfs -ls /user/hive/warehouse/final_comp_on;
Found 1 items
/user/hive/warehouse/final_comp_on/000000_0.deflate
The output table statistics show that the total_size is 16, but the raw_data_size is 6.
The extra space is overhead for the deflate algorithm. We can also see the output file
is named .deflate.
Trying to cat the file is not suggested, as you get binary output. However, Hive can
query this data normally:
Archive Partition:
The ALTER TABLE ... ARCHIVE PARTITION statement converts the table into an archived
table:
hive> SET hive.archive.enabled=true;
hive> ALTER TABLE hive_text ARCHIVE PARTITION (folder='docs');
intermediate.archived is
file:/user/hive/warehouse/hive_text/folder=docs_INTERMEDIATE_ARCHIVED
intermediate.original is
file:/user/hive/warehouse/hive_text/folder=docs_INTERMEDIATE_ORIGINAL
Creating data.har for file:/user/hive/warehouse/hive_text/folder=docs
in file:/tmp/hive-edward/hive_..._3862901820512961909/-ext-10000/partlevel
Please wait... (this may take a while)
Moving file:/tmp/hive-edward/hive_..._3862901820512961909/-ext-10000/partlevel
to file:/user/hive/warehouse/hive_text/folder=docs_INTERMEDIATE_ARCHIVED
Moving file:/user/hive/warehouse/hive_text/folder=docs
to file:/user/hive/warehouse/hive_text/folder=docs_INTERMEDIATE_ORIGINAL
Moving file:/user/hive/warehouse/hive_text/folder=docs_INTERMEDIATE_ARCHIVED
to file:/user/hive/warehouse/hive_text/folder=docs
rows of output. The array function takes a list of arguments and returns the list as a
single array type. Suppose we start with this query using an array:
hive> SELECT array(1,2,3) FROM dual;
[1,2,3]
The explode() function is a UDTF that takes an array of input and iterates through the
list, returning each element from the list in a separate row.
hive> SELECT explode(array(1,2,3)) AS element FROM src;
1
2
3
We want to list each manager-subordinate pair.
Example 13-1. Invalid use of explode
hive> SELECT name, explode(subordinates) FROM employees;
FAILED: Error in semantic analysis: UDTF's are not supported outside
the SELECT clause, nor nested in expressions
However, Hive offers a LATERAL VIEW feature to allow this kind of query:
hive> SELECT name, sub
> FROM employees
> LATERAL VIEW explode(subordinates) subView AS sub;
John Doe Mary Smith
John Doe Todd Jones
Mary Smith Bill King
Functions:
User-Defined Functions (UDFs) are a powerful feature that allow users to extend
HiveQL.
Functions may also contain extended documentation that can be accessed by adding
the EXTENDED keyword:
hive> DESCRIBE FUNCTION EXTENDED concat;
concat(str1, str2, ... strN) - returns the concatenation of str1, str2, ... strN
Returns NULL if any argument is NULL.
Example:
> SELECT concat('abc', 'def') FROM src LIMIT 1;
'abcdef'
Calling Functions
SELECT concat(column1,column2) AS x FROM table;Table Generating Functions
Table generating functions take zero or more inputs and produce multiple columns orrows of output. The array function takes a list of arguments and returns the list as a
single array type. Suppose we start with this query using an array:
hive> SELECT array(1,2,3) FROM dual;
[1,2,3]
The explode() function is a UDTF that takes an array of input and iterates through the
list, returning each element from the list in a separate row.
hive> SELECT explode(array(1,2,3)) AS element FROM src;
1
2
3
We want to list each manager-subordinate pair.
Example 13-1. Invalid use of explode
hive> SELECT name, explode(subordinates) FROM employees;
FAILED: Error in semantic analysis: UDTF's are not supported outside
the SELECT clause, nor nested in expressions
However, Hive offers a LATERAL VIEW feature to allow this kind of query:
hive> SELECT name, sub
> FROM employees
> LATERAL VIEW explode(subordinates) subView AS sub;
John Doe Mary Smith
John Doe Todd Jones
Mary Smith Bill King
A UDF:
To write a UDF, start by extending the UDF class and implements and the evaluate()function.
public class UDFZodiacSign extends UDF{
public String evaluate(String bday){
Date date = null;
try {
date = df.parse(bday);
} catch (Exception ex) {
return null;
}
return this.evaluate( date.getMonth()+1, date.getDay() );
}
}
To use the UDF inside Hive, compile the Java code and package the UDF bytecode
class file into a JAR file. Then, in your Hive session, add the JAR to the classpath and
use a CREATE FUNCTION statement to define a function that uses the Java class:
hive> ADD JAR /full/path/to/zodiac.jar;
hive> CREATE TEMPORARY FUNCTION zodiac
> AS 'org.apache.hadoop.hive.contrib.udf.example.UDFZodiacSign';
hive> SELECT name, bday, zodiac(bday) FROM littlebigdata;
edward capriolo 2-12-1981 Aquarius
Permanent Functions:
Your function may also be added permanently to Hive, however this requires a small
modification to a Hive Java file and then rebuilding Hive.
Inside the Hive source code, a one-line change is required to the FunctionRegistry class
found at ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java. Then you
rebuild Hive following the instructions that come with the source distribution.
While it is recommended that you redeploy the entire new build, only the hive-exec-
*.jar, where \* is the version number, needs to be replaced.
Here is an example change to FunctionRegistry where the new nvl() function is added
to Hive’s list of built-in functions:
...
registerUDF("parse_url", UDFParseUrl.class, false);
registerGenericUDF("nvl", GenericUDFNvl.class);
registerGenericUDF("split", GenericUDFSplit.class);
User-Defined Table Generating Functions
While UDFs can be used be return arrays or structures, they cannot return multiple
columns or multiple rows. User-Defined Table Generating Functions, or UDTFs, address
this need by providing a programmatic interface to return multiple columns and
even multiple rows.
Accessing the Distributed Cache from a UDF
UDFs may access files inside the distributed cache, the local filesystem, or even the
distributed filesystem. This access should be used cautiously as the overhead is
significant.
Streaming:
CLUSTER BY, DISTRIBUTE BY, SORT BY
Hive offers syntax to control how data is distributed and sorted. These features can be
used on most queries, but are particularly useful when doing streaming processes. For
example, data for the same key may need to be sent to the same processing node, or
data may need to be sorted by a specific column, or by a function. Hive provides several
ways to control this behavior
Customizing Hive File and Record Formats:
Hive draws a clear distinction between the file format, how records are encoded in a
file, the record format, and how the stream of bytes for a given record are encoded in
the record.
In this book we have been using text files, with the default STORED AS TEXTFILE in CREATE
TABLE statements
Locking:
Because of the write-once nature and the streaming style of MapReduce, access
to fine-grained locking is unnecessary
For example, if one user wishes to lock a table, because an INSERT OVERWRITE query is changing its content, and a second user attempts to issue a query against the table at the same time, the query could fail or yield invalid results.
As we know there are many companies which are converting into Big data modernization solutions. with the right direction we can definitely predict the future.
ReplyDelete