HIVE MODULES |
All Hive installations require a metastore service, which Hive uses to store table schemas
and other metadata.
By default, Hive uses a built-in Derby SQL server, which provides limited, singleprocess
storage. For example, when using Derby, you can’t run two simultaneous instances
of the Hive CLI.
cd $HIVE_HOME changes to hive home dir
Local Mode Configuration
Recall that in local mode, all references to files go to your local filesystem, not the
distributed filesystem. There are no services running. Instead, your jobs run all tasks
in a single JVM instance.
You can also configure a different directory for Hive to store table data, if you don’t
want to use the default location, which is file:///user/hive/warehouse, for local mode,
and hdfs://namenode_server/user/hive/warehouse for the other modes
hive-default.xml.template: mostly default config
Changes to your configuration are done by editing the hive-site.xml file. Create
one if it doesn’t already exist.
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/home/me/hive/warehouse</value>
<description>
Local or HDFS directory where Hive keeps table contents.
</description>
</property>
<property>
<name>hive.metastore.local</name>
<value>true</value>
<description>
Use false if a production metastore server is used.
</description>
</property>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:derby:;databaseName=/home/me/hive/metastore_db;create=true</value>
<description>
The JDBC connection URL.
</description>
</property>
</configuration>
Distributed and Pseudodistributed Mode Configuration:
In distributed mode, several services run in the cluster. The JobTracker manages jobs
and the NameNode is the HDFS master. Worker nodes run individual job tasks, managed
by a TaskTracker service on each node, and then hold blocks for files in the distributed filesystem, managed by DataNode services.
Variables and Properties
The --define key=value option is effectively equivalent to the --hivevar key=value
option. Both let you define on the command line custom variables that you can reference
in Hive scripts to customize execution. This feature is only supported in Hive
v0.8.0 and later versions.
hivevar Read/Write (v0.8.0 and later) User-defined custom variables.
hiveconf Read/Write Hive-specific configuration properties.
system Read/Write Configuration properties defined by Java.
env Read only Environment variables defined by the shell environment (e.g.,
bash).
The .hiverc File
The last CLI option we’ll discuss is the -i file option, which lets you specify a file of
commands for the CLI to run as it starts, before showing you the prompt. Hive automatically
looks for a file named .hiverc in your HOME directory and runs the commands
it contains, if any.
Collection Data Types:
STRUCT:
Analogous to a C struct or an “object.” Fields can be accessed
using the “dot” notation. For example, if a column name is of
type STRUCT {first STRING; last STRING}, then
the first name field can be referenced using name.first.
complex data type declaration that defines a physically grouped list of variables to be placed under one name in a block of memory, allowing the different variables to be accessed via a single pointer, or the struct declared name which returns the same address.
ex:struct('John', 'Doe')
MAP:
A collection of key-value tuples, where the fields are accessed
using array notation (e.g., ['key']). For example, if a column
name is of type MAP with key→value pairs
'first'→'John' and 'last'→'Doe', then the last
name can be referenced using name['last'].
eg:
map('first', 'John', 'last', 'Doe')
ARRAY
Ordered sequences of the same type that are indexable using
zero-based integers. For example, if a column name is of type
ARRAY of strings with the value ['John', 'Doe'], then
the second element can be referenced using name[1].
eg: array('John', 'Doe')
Text File Encoding of Data Values
Let’s begin our exploration of file formats by looking at the simplest example, text files.You are no doubt familiar with text files delimited with commas or tabs, the so-called
comma-separated values (CSVs) or tab-separated values (TSVs), respectively
\n:
For text files, each line is a record, so the line feed character separates records.
^A (“control” A):
Separates all fields (columns). Written using the octal code \001 when explicitly
specified in CREATE TABLE statements.
^B:
Separate the elements in an ARRAY or STRUCT, or the key-value pairs in a MAP.
Written using the octal code \002 when explicitly specified in CREATE TABLE
statements.
^C:
Separate the key from the corresponding value in MAP key-value pairs. Written using
the octal code \003 when explicitly specified in CREATE TABLE statements.
You can override these default delimiters. This might be necessary if another application
writes the data using a different convention. Here is the same table declaration
again, this time with all the format defaults explicitly specified:
CREATE TABLE employees (
name STRING,
salary FLOAT,
subordinates ARRAY<STRING>,
deductions MAP<STRING, FLOAT>,
address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
Schema on Read:
When you write data to a traditional database, either through loading external data,
writing the output of a query, doing UPDATE statements, etc., the database has total
control over the storage. The database is the “gatekeeper.” An important implication
of this control is that the database can enforce the schema as data is written. This is
called schema on write.
So what if the schema doesn’t match the file contents? Hive does the best that it can to
read the data. You will get lots of null values if there aren’t enough fields in each record
to match the schema. If some fields are numbers and Hive encounters nonnumeric
strings, it will return nulls for those fields. Above all else, Hive tries to recover from all
errors as best it can.
Databases in Hive:
The Hive concept of a database is essentially just a catalog or namespace of tables.
However, they are very useful for larger clusters with multiple teams and users, as a
way of avoiding table name collisions. It’s also common to use databases to organize
production tables into logical groups.
hive> CREATE DATABASE IF NOT EXISTS financials;
hive> SHOW DATABASES;
default
financials
hive> CREATE DATABASE human_resources;
hive> SHOW DATABASES;
default
financials
human_resources
You can override this default location for the new directory as shown in this example:
hive> CREATE DATABASE financials
> LOCATION '/my/preferred/directory';
hive> CREATE DATABASE financials
> COMMENT 'Holds all financial tables';
hive> DESCRIBE DATABASE financials;
financials Holds all financial tables
hdfs://master-server/user/hive/warehouse/financials.db
hive> CREATE DATABASE financials
> WITH DBPROPERTIES ('creator' = 'Mark Moneybags', 'date' = '2012-01-02');
hive> DESCRIBE DATABASE financials;
financials hdfs://master-server/user/hive/warehouse/financials.db
hive> DESCRIBE DATABASE EXTENDED financials;
financials hdfs://master-server/user/hive/warehouse/financials.db
{date=2012-01-02, creator=Mark Moneybags);
The USE command sets a database as your working database, analogous to changing
working directories in a filesystem:
hive> USE financials;
hive> set hive.cli.print.current.db=true;
hive (financials)> USE default;
hive (default)> set hive.cli.print.current.db=false;
hive> ...
Finally, you can drop a database:
hive> DROP DATABASE IF EXISTS financials;
The IF EXISTS is optional and suppresses warnings if financials doesn’t exist.
By default, Hive won’t permit you to drop a database if it contains tables. You can either
drop the tables first or append the CASCADE keyword to the command, which will cause
the Hive to drop the tables in the database first:
hive> DROP DATABASE IF EXISTS financials CASCADE;
Using the RESTRICT keyword instead of CASCADE is equivalent to the default behavior,
where existing tables must be dropped before dropping the database.
When a database is dropped, its directory is also deleted.
Alter Database
You can set key-value pairs in the DBPROPERTIES associated with a database using the
ALTER DATABASE command. No other metadata about the database can be changed,
including its name and directory location:
hive> ALTER DATABASE financials SET DBPROPERTIES ('edited-by' = 'Joe Dba');
CREATE TABLE IF NOT EXISTS mydb.employees (
name STRING COMMENT 'Employee name',
salary FLOAT COMMENT 'Employee salary',
subordinates ARRAY<STRING> COMMENT 'Names of subordinates',
deductions MAP<STRING, FLOAT>
COMMENT 'Keys are deductions names, values are percentages',
address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
COMMENT 'Home address')
COMMENT 'Description of the table'
TBLPROPERTIES ('creator'='me', 'created_at'='2012-01-02 10:00:00', ...)
LOCATION '/user/hive/warehouse/mydb.db/employees';
We can also use the DESCRIBE EXTENDED mydb.employees command to show details about
the table. (We can drop the mydb. prefix if we’re currently using the mydb database.) We
have reformatted the output for easier reading and we have suppressed many details
to focus on the items that interest us now:
hive> DESCRIBE EXTENDED mydb.employees;
name string Employee name
salary float Employee salary
subordinates array<string> Names of subordinates
deductions map<string,float> Keys are deductions names, values are percentages
address struct<street:string,city:string,state:string,zip:int> Home address
Detailed Table Information Table(tableName:employees, dbName:mydb, owner:me,
...
location:hdfs://master-server/user/hive/warehouse/mydb.db/employees,
parameters:{creator=me, created_at='2012-01-02 10:00:00',
last_modified_user=me, last_modified_time=1337544510,
comment:Description of the table, ...}, ...)
Replacing EXTENDED with FORMATTED provides more readable but also more verbose
output.
Managed Tables:
The tables we have created so far are called managed tables or sometimes called internal
tables, because Hive controls the lifecycle of their data (more or less). As we’ve seen,
Hive stores the data for these tables in a subdirectory under the directory defined by
hive.metastore.warehouse.dir (e.g., /user/hive/warehouse), by default.
External Tables
Because it’s external, Hive does not assume it owns the data. Therefore, dropping the
table does not delete the data, although the metadata for the table will be deleted.
The following table declaration creates an external table that can read all the data files
for this comma-delimited data in /data/stocks:
CREATE EXTERNAL TABLE IF NOT EXISTS stocks (
exchange STRING,
symbol STRING,
ymd STRING,
price_open FLOAT,
price_high FLOAT,
price_low FLOAT,
price_close FLOAT,
volume INT,
price_adj_close FLOAT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LOCATION '/data/stocks';
Even for managed tables, you know where
they are located, so you can use other tools, hadoop dfs commands, etc., to modify and
even delete the files in the directories for managed tables. Hive may technically own
these directories and files, but it doesn’t have full control over them!
Partitioned, Managed Tables:
it’s used for distributing load horizontally, moving data physically closer to its most
frequent users, and other purposes.
CREATE TABLE employees (
name STRING,
salary FLOAT,
subordinates ARRAY<STRING>,
deductions MAP<STRING, FLOAT>,
address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
PARTITIONED BY (country STRING, state STRING);
Partitioning tables changes how Hive structures the data storage. If we create this table
in the mydb database, there will still be an employees directory for the table:
hdfs://master_server/user/hive/warehouse/mydb.db/employees
However, Hive will now create subdirectories reflecting the partitioning structure. For
example:
...
.../employees/country=CA/state=AB
.../employees/country=CA/state=BC
...
.../employees/country=US/state=AL
.../employees/country=US/state=AK
When we add predicates to WHERE clauses that filter on partition values, these predicates
are called partition filters.
However, a query across all partitions could trigger an enormous MapReduce job if the
table data and number of partitions are large.
A highly suggested safety measure is
putting Hive into “strict” mode, which prohibits queries of partitioned tables without
a WHERE clause that filters on partitions. You can set the mode to “nonstrict,” as in the
following session:
hive> set hive.mapred.mode=strict;
hive> SELECT e.name, e.salary FROM employees e LIMIT 100;
FAILED: Error in semantic analysis: No partition predicate found for
Alias "e" Table "employees"
hive> set hive.mapred.mode=nonstrict;
hive> SELECT e.name, e.salary FROM employees e LIMIT 100;
You can see the partitions that exist with the SHOW PARTITIONS command:
hive> SHOW PARTITIONS employees;
...
Country=CA/state=AB
country=CA/state=BC
...
country=US/state=AL
country=US/state=AK
...
If you have a lot of partitions and you want to see if partitions have been defined for
particular partition keys, you can further restrict the command with an optional PARTI
TION clause that specifies one or more of the partitions with specific values:
hive> SHOW PARTITIONS employees PARTITION(country='US');
country=US/state=AL
country=US/state=AK
...
hive> SHOW PARTITIONS employees PARTITION(country='US', state='AK');
country=US/state=AK
The DESCRIBE EXTENDED employees command shows the partition keys:
hive> DESCRIBE EXTENDED employees;
name string,
salary float,
...
address struct<...>,
country string,
state string
Detailed Table Information...
partitionKeys:[FieldSchema(name:country, type:string, comment:null),
FieldSchema(name:state, type:string, comment:null)],
manual Partition:
LOAD DATA LOCAL INPATH '${env:HOME}/california-employees'
INTO TABLE employees
PARTITION (country = 'US', state = 'CA');
CREATE EXTERNAL TABLE IF NOT EXISTS log_messages (
hms INT,
severity STRING,
server STRING,
process_id INT,
message STRING)
PARTITIONED BY (year INT, month INT, day INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';
As for managed partitioned tables, you can see an external table’s partitions with SHOW
PARTITIONS:
hive> SHOW PARTITIONS log_messages;
...
year=2011/month=12/day=31
year=2012/month=1/day=1
year=2012/month=1/day=2
Customizing Table Storage Formats:
Hive defaults to
a text file format, which is indicated by the optional clause STORED AS TEXTFILE, and
you can overload the default values for the various delimiters when creating the table.
Here we repeat the definition of the employees table we used in that discussion:
CREATE TABLE employees (
name STRING,
salary FLOAT,
subordinates ARRAY<STRING>,
deductions MAP<STRING, FLOAT>,
address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
You can replace TEXTFILE with one of the other built-in file formats supported by Hive,
including SEQUENCEFILE and RCFILE, both of which optimize disk space usage and I/O
bandwidth performance using binary encoding and optional compression.
The record encoding is handled by an input format object (e.g., the Java code behind
TEXTFILE.) Hive uses a Java class (compiled module) named org.apache
.hadoop.mapred.TextInputFormat.
The record parsing is handled by a serializer/deserializer or SerDe for short.
For TEXT FILE and the encoding the SerDe Hive uses is another Java class called org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.
CREATE TABLE kst
PARTITIONED BY (ds string)
ROW FORMAT SERDE 'com.linkedin.haivvreo.AvroSerDe'
WITH SERDEPROPERTIES ('schema.url'='http://schema_provider/kst.avsc')
STORED AS
INPUTFORMAT 'com.linkedin.haivvreo.AvroContainerInputFormat'
OUTPUTFORMAT 'com.linkedin.haivvreo.AvroContainerOutputFormat';
Dropping Tables:
DROP TABLE IF EXISTS employees;
The IF EXISTS keywords are optional. If not used and the table doesn’t exist, Hive
returns an error.
Actually, if you enable the Hadoop Trash feature, which is not on by
default, the data is moved to the .Trash directory in the distributed
filesystem for the user, which in HDFS is /user/$USER/.Trash. To enable
this feature, set the property fs.trash.interval to a reasonable positive
number. It’s the number of minutes between “trash checkpoints”; 1,440
would be 24 hours. While it’s not guaranteed to work for all versions of
all distributed filesystems, if you accidentally drop a managed table with
important data, you may be able to re-create the table, re-create any
partitions, and then move the files from .Trash to the correct directories
(using the filesystem commands) to restore the data.
Alter Table:
Most table properties can be altered with ALTER TABLE statements, which change
metadata about the table but not the data itself. These statements can be used to fix
mistakes in schema, move partition locations
Use this statement to rename the table log_messages to logmsgs:
ALTER TABLE log_messages RENAME TO logmsgs;
Adding, Modifying, and Dropping a Table Partition
As we saw previously, ALTER TABLE table ADD PARTITION … is used to add a new partition
to a table (usually an external table). Here we repeat the same command shown previously
with the additional options available:
ALTER TABLE log_messages ADD IF NOT EXISTS
PARTITION (year = 2011, month = 1, day = 1) LOCATION '/logs/2011/01/01'
PARTITION (year = 2011, month = 1, day = 2) LOCATION '/logs/2011/01/02'
PARTITION (year = 2011, month = 1, day = 3) LOCATION '/logs/2011/01/03';
Changing Columns:
You can rename a column, change its position, type, or comment:
ALTER TABLE log_messages
CHANGE COLUMN hms hours_minutes_seconds INT
COMMENT 'The hours, minutes, and seconds part of the timestamp'
AFTER severity;
Adding Columns
You can add new columns to the end of the existing columns, before any partition
columns.
ALTER TABLE log_messages ADD COLUMNS (
app_name STRING COMMENT 'Application name',
session_id LONG COMMENT 'The current session id');
Deleting or Replacing Columns
The following example removes all the existing columns and replaces them with the
new columns specified:
ALTER TABLE log_messages REPLACE COLUMNS (
hours_mins_secs INT COMMENT 'hour, minute, seconds from timestamp',
severity STRING COMMENT 'The message severity'
message STRING COMMENT 'The rest of the message');
Alter Table Properties
You can add additional table properties or modify existing properties, but not remove
them:
ALTER TABLE log_messages SET TBLPROPERTIES (
'notes' = 'The process id is no longer captured; this column is always NULL');
Alter Storage Properties
There are several ALTER TABLE statements for modifying format and SerDe properties.
The following statement changes the storage format for a partition to be SEQUENCE
FILE,
ALTER TABLE log_messages
PARTITION(year = 2012, month = 1, day = 1)
SET FILEFORMAT SEQUENCEFILE;
You can specify a new SerDe along with SerDe properties or change the properties for
the existing SerDe. The following example specifies that a table will use a Java class
named com.example.JSONSerDe to process a file of JSON-encoded records:
ALTER TABLE table_using_JSON_storage
SET SERDE 'com.example.JSONSerDe'
WITH SERDEPROPERTIES (
'prop1' = 'value1',
'prop2' = 'value2');
Miscellaneous Alter Table Statements
ALTER TABLE log_messages TOUCH
PARTITION(year = 2012, month = 1, day = 1);
The PARTITION clause is required for partitioned tables. A typical scenario for this statement
is to trigger execution of the hooks when table storage files have been modified
outside of Hive. For example, a script that has just written new files for the 2012/01/01
partition for log_message can make the following call to the Hive CLI:
hive -e 'ALTER TABLE log_messages TOUCH PARTITION(year = 2012, month = 1, day = 1);'
The ALTER TABLE … ARCHIVE PARTITION statement captures the partition files into a Hadoop
archive (HAR) file. This only reduces the number of files in the filesystem, reducing
the load on the NameNode, but doesn’t provide any space savings (e.g., through
compression):
ALTER TABLE log_messages ARCHIVE
PARTITION(year = 2012, month = 1, day = 1);
To reverse the operation, substitute UNARCHIVE for ARCHIVE. This feature is only available
for individual partitions of partitioned tables.
Finally, various protections are available. The following statements prevent the partition
from being dropped and queried:
ALTER TABLE log_messages
PARTITION(year = 2012, month = 1, day = 1) ENABLE NO_DROP;
ALTER TABLE log_messages
PARTITION(year = 2012, month = 1, day = 1) ENABLE OFFLINE;
HiveQL: Data Manipulation
Loading Data into Managed Tables
LOAD DATA LOCAL INPATH '${env:HOME}/california-employees'
OVERWRITE INTO TABLE employees
PARTITION (country = 'US', state = 'CA');
If you specify the OVERWRITE keyword, any data already present in the target directory
will be deleted first. Without the keyword, the new files are simply added to the target
directory. However, if files already exist in the target directory that match filenames
being loaded, the old files are overwritten.
Inserting Data into Tables from Queries:
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state = 'OR')
SELECT * FROM staged_employees se
WHERE se.cnty = 'US' AND se.st = 'OR';
With OVERWRITE, any previous contents of the partition (or whole table if not partitioned)
are replaced.
If you drop the keyword OVERWRITE or replace it with INTO, Hive appends the data rather
than replaces it.
FROM staged_employees se
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state = 'OR')
SELECT * WHERE se.cnty = 'US' AND se.st = 'OR'
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state = 'CA')
SELECT * WHERE se.cnty = 'US' AND se.st = 'CA'
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state = 'IL')
SELECT * WHERE se.cnty = 'US' AND se.st = 'IL';
Dynamic Partition Inserts:
Consider this change to the previous example:
INSERT OVERWRITE TABLE employees
PARTITION (country, state)
SELECT ..., se.cnty, se.st
FROM staged_employees se;
You can also mix dynamic and static partitions. This variation of the previous query
specifies a static value for the country (US) and a dynamic value for the state:
INSERT OVERWRITE TABLE employees
PARTITION (country = 'US', state)
SELECT ..., se.cnty, se.st
FROM staged_employees se
WHERE se.cnty = 'US';
Dynamic partitioning is not enabled by default. When it is enabled, it works in “strict”
mode by default, where it expects at least some columns to be static. This helps protect
against a badly designed query that generates a gigantic number of partitions. For example,
you partition by timestamp and generate a separate partition for each second!
Perhaps you meant to partition by day or maybe hour instead. Several other properties
are also used to limit excess resource utilization.
hive> set hive.exec.dynamic.partition=true;
hive> set hive.exec.dynamic.partition.mode=nonstrict;
hive> set hive.exec.max.dynamic.partitions.pernode=1000;
hive> INSERT OVERWRITE TABLE employees
> PARTITION (country, state)
> SELECT ..., se.cty, se.st
> FROM staged_employees se;
Creating Tables and Loading Them in One Query
You can also create a table and insert query results into it in one statement:
CREATE TABLE ca_employees
AS SELECT name, salary, address
FROM employees
WHERE se.state = 'CA';
Exporting Data:
hadoop fs -cp source_path target_path
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/ca_employees'
SELECT name, salary, address
FROM employees
WHERE se.state = 'CA';
hive> ! ls /tmp/ca_employees;
000000_0
hive> ! cat /tmp/payroll/000000_0
John Doe100000.0201 San Antonio CircleMountain ViewCA94040
Mary Smith80000.01 Infinity LoopCupertinoCA95014
...
HiveQL: Queries:
SELECT … FROM Clauses:
CREATE TABLE employees (
name STRING,
salary FLOAT,
subordinates ARRAY<STRING>,
deductions MAP<STRING, FLOAT>,
address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
PARTITIONED BY (country STRING, state STRING);
hive> SELECT name, salary FROM employees;
John Doe 100000.0
Mary Smith 80000.0
--subordinates is array
hive> SELECT name, subordinates FROM employees;
John Doe ["Mary Smith","Todd Jones"]
Mary Smith ["Bill King"]
Todd Jones []
Bill King []
The deductions is a MAP, where the JSON representation for maps is used, namely a
comma-separated list of key:value pairs, surrounded with {...}:
hive> SELECT name, deductions FROM employees;
John Doe {"Federal Taxes":0.2,"State Taxes":0.05,"Insurance":0.1}
Mary Smith {"Federal Taxes":0.2,"State Taxes":0.05,"Insurance":0.1}
Todd Jones {"Federal Taxes":0.15,"State Taxes":0.03,"Insurance":0.1}
Bill King {"Federal Taxes":0.15,"State Taxes":0.03,"Insurance":0.1}
Finally, the address is a STRUCT, which is also written using the JSON map format:
hive> SELECT name, address FROM employees;
John Doe {"street":"1 Michigan Ave.","city":"Chicago","state":"IL","zip":60600}
Mary Smith {"street":"100 Ontario St.","city":"Chicago","state":"IL","zip":60601}
Todd Jones {"street":"200 Chicago Ave.","city":"Oak Park","state":"IL","zip":60700}
Bill King {"street":"300 Obscure Dr.","city":"Obscuria","state":"IL","zip":60100}
First, ARRAY indexing is 0-based, as in Java. Here is a query that selects the first element
of the subordinates array:
hive> SELECT name, subordinates[0] FROM employees;
John Doe Mary Smith
Mary Smith Bill King
Todd Jones NULL
Bill King NULL
To reference a MAP element, you also use ARRAY[...] syntax, but with key values instead
of integer indices:
hive> SELECT name, deductions["State Taxes"] FROM employees;
John Doe 0.05
Mary Smith 0.05
Todd Jones 0.03
Bill King 0.03
Finally, to reference an element in a STRUCT, you use “dot” notation, similar to the
table_alias.column mentioned above:
hive> SELECT name, address.city FROM employees;
John Doe Chicago
Mary Smith Chicago
Todd Jones Oak Park
Bill King Obscuria
These same referencing techniques are also used in WHERE clauses.
Specify Columns with Regular Expressions
We can even use regular expressions to select the columns we want. The following query
selects the symbol column and all columns from stocks whose names start with the
prefix price:1
hive> SELECT symbol, `price.*` FROM stocks;
AAPL 195.69 197.88 194.0 194.12 194.12
AAPL 192.63 196.0 190.85 195.46 195.46
AAPL 196.73 198.37 191.57 192.05 192.05
AAPL 195.17 200.2 194.42 199.23 199.23
AAPL 195.91 196.32 193.38 195.86 195.86
LIMIT Clause:
hive> SELECT upper(name), salary, deductions["Federal Taxes"],
> round(salary * (1 - deductions["Federal Taxes"])) FROM employees
> LIMIT 2;
JOHN DOE 100000.0 0.2 80000
MARY SMITH 80000.0 0.2 64000
Column Aliases
hive> SELECT upper(name), salary, deductions["Federal Taxes"] as fed_taxes,
> round(salary * (1 - deductions["Federal Taxes"])) as salary_minus_fed_taxes
> FROM employees LIMIT 2;
JOHN DOE 100000.0 0.2 80000
MARY SMITH 80000.0 0.2 64000
Nested SELECT Statements
The column alias feature is especially useful in nested select statements. Let’s use the
previous example as a nested query:
hive> FROM (
> SELECT upper(name), salary, deductions["Federal Taxes"] as fed_taxes,
> round(salary * (1 - deductions["Federal Taxes"])) as salary_minus_fed_taxes
> FROM employees
> ) e
> SELECT e.name, e.salary_minus_fed_taxes
> WHERE e.salary_minus_fed_taxes > 70000;
JOHN DOE 100000.0 0.2 80000;
CASE … WHEN … THEN Statements:
The CASE … WHEN … THEN clauses are like if statements for individual columns in query
results. For example:
hive> SELECT name, salary,
> CASE
> WHEN salary < 50000.0 THEN 'low'
> WHEN salary >= 50000.0 AND salary < 70000.0 THEN 'middle'
> WHEN salary >= 70000.0 AND salary < 100000.0 THEN 'high'
> ELSE 'very high'
> END AS bracket FROM employees;
John Doe 100000.0 very high
Mary Smith 80000.0 high
Todd Jones 70000.0 high
Bill King 60000.0 middle
Boss Man 200000.0 very high
Fred Finance 150000.0 very high
Stacy Accountant 60000.0 middle
When Hive Can Avoid MapReduce
Furthermore, Hive will attempt to run other operations in local mode if the
hive.exec.mode.local.auto property is set to true:
set hive.exec.mode.local.auto=true;
WHERE Clauses
While SELECT clauses select columns, WHERE clauses are filters; they select which records
to return.
Gotchas with Floating-Point Comparisons
A common gotcha arises when you compare floating-point numbers of different types(i.e., FLOAT versus DOUBLE). Consider the following query of the employees table, which
is designed to return the employee’s name, salary, and federal taxes deduction, but only
if that tax deduction exceeds 0.2 (20%) of his or her salary:
hive> SELECT name, salary, deductions['Federal Taxes']
> FROM employees WHERE deductions['Federal Taxes'] > 0.2;
John Doe 100000.0 0.2
Mary Smith 80000.0 0.2
Boss Man 200000.0 0.3
Fred Finance 150000.0 0.3
Wait! Why are records with deductions['Federal Taxes'] = 0.2 being returned?
Is it a Hive bug? There is a bug filed against Hive for this issue, but it actually reflects
the behavior of the internal representation of floating-point numbers when they are
compared and it affects almost all software written in most languages on all modern
digital computers
Actually, it doesn’t work. Here’s why. The number 0.2 can’t be represented exactly in
a FLOAT or DOUBLE.
To simplify things a bit, let’s say that 0.2 is actually 0.2000001 for FLOAT and
0.200000000001 for DOUBLE, because an 8-byte DOUBLE has more significant digits (after
the decimal point). When the FLOAT value from the table is converted to DOUBLE by Hive,
it produces the DOUBLE value 0.200000100000, which is greater than 0.200000000001.
That’s why the query results appear to use >= not >!
This issue is not unique to Hive nor Java, in which Hive is implemented. Rather, it’s a
general problem for all systems that use the IEEE standard for encoding floating-point
numbers!
We could use DOUBLE instead of FLOAT in our schema. Then we would be
comparing a DOUBLE for the deductions['Federal Taxes'] with a double for the literal
0.2. However, this change will increase the memory footprint of our queries. Also, we
can’t simply change the schema like this if the data file is a binary file format like
SEQUENCEFILE
Here is a modified query that casts the 0.2 literal value to FLOAT. With this change, the
expected results are returned by the query:
hive> SELECT name, salary, deductions['Federal Taxes'] FROM employees
> WHERE deductions['Federal Taxes'] > cast(0.2 AS FLOAT);
Boss Man 200000.0 0.3
Fred Finance 150000.0 0.3
Note the syntax inside the cast operator: number AS FLOAT.
LIKE and RLIKE:
hive> SELECT name, address.street FROM employees WHERE address.street LIKE '%Ave.';
John Doe 1 Michigan Ave.
Todd Jones 200 Chicago Ave.
Hive extension is the RLIKE clause, which lets us use Java regular expressions, a more
powerful minilanguage for specifying matches.
hive> SELECT name, address.street
> FROM employees WHERE address.street RLIKE '.*(Chicago|Ontario).*';
Mary Smith 100 Ontario St.
Todd Jones 200 Chicago Ave.
GROUP BY Clauses:
The GROUP BY statement is often used in conjunction with aggregate functions to
group the result set by one or more columns and then perform an aggregation over each
group.
hive> SELECT year(ymd), avg(price_close) FROM stocks
> WHERE exchange = 'NASDAQ' AND symbol = 'AAPL'
> GROUP BY year(ymd);
1984 25.578625440597534
1985 20.193676221040867
1986 32.46102808021274
1987 53.88968399108163
1988 41.540079275138766
1989 41.65976212516664
1990 37.56268799823263
1991 52.49553383386182
1992 54.80338610251119
1993 41.02671956450572
1994 34.0813495847914
HAVING Clauses
The HAVING clause lets you constrain the groups produced by GROUP BY in a way thatcould be expressed with a subquery, using a syntax that’s easier to express. Here’s the
previous query with an additional HAVING clause that limits the results to years where
the average closing price was greater than $50.0:
hive> SELECT year(ymd), avg(price_close) FROM stocks
> WHERE exchange = 'NASDAQ' AND symbol = 'AAPL'
> GROUP BY year(ymd)
> HAVING avg(price_close) > 50.0;
1987 53.88968399108163
1991 52.49553383386182
1992 54.80338610251119
1999 57.77071460844979
2000 71.74892876261757
2005 52.401745992993554
Without the HAVING clause, this query would require a nested SELECT statement:
hive> SELECT s2.year, s2.avg FROM
> (SELECT year(ymd) AS year, avg(price_close) AS avg FROM stocks
> WHERE exchange = 'NASDAQ' AND symbol = 'AAPL'
> GROUP BY year(ymd)) s2
> WHERE s2.avg > 50.0;
1987 53.88968399108163
JOIN Statements:
Hive supports the classic SQL JOIN statement, but only equi-joins are supported.
Inner JOIN
In an inner JOIN, records are discarded unless join criteria finds matching records in
every table being joined.
hive> SELECT a.ymd, a.price_close, b.price_close
> FROM stocks a JOIN stocks b ON a.ymd = b.ymd
> WHERE a.symbol = 'AAPL' AND b.symbol = 'IBM';
2010-01-04 214.01 132.45
2010-01-05 214.38 130.85
2010-01-06 210.97 130.0
2010-01-07 210.58 129.55
2010-01-08 211.98 130.85
2010-01-11 210.11 129.48
Join Optimizations
In the previous example, every ON clause uses a.ymd as one of the join keys. In this case,
Hive can apply an optimization where it joins all three tables in a single MapReduce
job. The optimization would also be used if b.ymd were used in both ON clauses.
Hive also assumes that the last table in the query is the largest. It attempts to buffer the
other tables and then stream the last table through, while performing joins on individual
records. Therefore, you should structure your join queries so the largest table is last.
Recall our previous join between stocks and dividends. We actually made the mistake
of using the smaller dividends table last:
SELECT s.ymd, s.symbol, s.price_close, d.dividend
FROM stocks s JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol
WHERE s.symbol = 'AAPL';
We should switch the positions of stocks and dividends:
SELECT s.ymd, s.symbol, s.price_close, d.dividend
FROM dividends d JOIN stocks s ON s.ymd = d.ymd AND s.symbol = d.symbol
WHERE s.symbol = 'AAPL';
LEFT OUTER JOIN:
The left-outer join is indicated by adding the LEFT OUTER keywords:
hive> SELECT s.ymd, s.symbol, s.price_close, d.dividend
> FROM stocks s LEFT OUTER JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol
> WHERE s.symbol = 'AAPL';
OUTER JOIN Gotcha:
Recall what we said previously about speeding up queries by adding partition filters in
the WHERE clause. To speed up our previous query, we might choose to add predicates
that select on the exchange in both tables:
hive> SELECT s.ymd, s.symbol, s.price_close, d.dividend
> FROM stocks s LEFT OUTER JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol
> WHERE s.symbol = 'AAPL'
> AND s.exchange = 'NASDAQ' AND d.exchange = 'NASDAQ';
1987-05-11 AAPL 77.0 0.015
1987-08-10 AAPL 48.25 0.015
1987-11-17 AAPL 35.0 0.02
1988-02-12 AAPL 41.0 0.02
1988-05-16 AAPL 41.25 0.02
RIGHT OUTER JOIN:
Right-outer joins return all records in the righthand table that match the WHERE clause.
NULL is used for fields of missing records in the lefthand table.
Here we switch the places of stocks and dividends and perform a righthand join, but
leave the SELECT statement unchanged:
hive> SELECT s.ymd, s.symbol, s.price_close, d.dividend
> FROM dividends d RIGHT OUTER JOIN stocks s ON d.ymd = s.ymd AND d.symbol = s.symbol
> WHERE s.symbol = 'AAPL';
...
1987-05-07 AAPL 80.25 NULL
1987-05-08 AAPL 79.0 NULL
1987-05-11 AAPL 77.0 0.015
1987-05-12 AAPL 75.5 NULL
1987-05-13 AAPL 78.5 NULL
FULL OUTER JOIN
Finally, a full-outer join returns all records from all tables that match the WHERE clause.
NULL is used for fields in missing records in either table.
If we convert the previous query to a full-outer join, we’ll actually get the same results,
since there is never a case where a dividend record exists without a matching stock
record:
hive> SELECT s.ymd, s.symbol, s.price_close, d.dividend
> FROM dividends d FULL OUTER JOIN stocks s ON d.ymd = s.ymd AND d.symbol = s.symbol
> WHERE s.symbol = 'AAPL';
...
1987-05-07 AAPL 80.25 NULL
1987-05-08 AAPL 79.0 NULL
1987-05-11 AAPL 77.0 0.015
1987-05-12 AAPL 75.5 NULL
1987-05-13 AAPL 78.5 NULL
LEFT SEMI-JOIN
A left semi-join returns records from the lefthand table if records are found in the righthand
table that satisfy the ON predicates.
Instead, you use the following LEFT SEMI JOIN syntax:
hive> SELECT s.ymd, s.symbol, s.price_close
> FROM stocks s LEFT SEMI JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol;
...
1962-11-05 IBM 361.5
1962-08-07 IBM 373.25
1962-05-08 IBM 459.5
1962-02-06 IBM 551.5
Cartesian Product JOINs
Cartesian product is a join where all the tuples in the left side of the join are paired
with all the tuples of the right table. If the left table has 5 rows and the right table has
6 rows, 30 rows of output will be produced:
SELECTS * FROM stocks JOIN dividends;
Map-side Joins:
If all but one table is small, the largest table can be streamed through the mappers while
the small tables are cached in memory. Hive can do all the joining map-side, since it
can look up every possible match against the small tables in memory, thereby eliminating
the reduce step required in the more common join scenarios. Even on smaller
data sets, this optimization is noticeably faster than the normal join. Not only does it
eliminate reduce steps, it sometimes reduces the number of map steps, too.
SELECT /*+ MAPJOIN(d) */ s.ymd, s.symbol, s.price_close, d.dividend
FROM stocks s JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol
WHERE s.symbol = 'AAPL';
Running this query versus the original on a fast MacBook Pro laptop yielded times of
approximately 23 seconds versus 33 seconds for the original unoptimized query, which
is roughly 30% faster using our sample stock data.
The hint still works, but it’s now deprecated as of Hive v0.7. However, you still have
to set a property, hive.auto.convert.join, to true before Hive will attempt the optimization.
It’s false by default:
hive> set hive.auto.convert.join=true;
hive> SELECT s.ymd, s.symbol, s.price_close, d.dividend
> FROM stocks s JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol
> WHERE s.symbol = 'AAPL';
If you always want Hive to attempt this optimization, set one or both of these properties
in your $HOME/.hiverc file.
However, this optimization is not turned on by default. It must be enabled by setting
the property hive.optimize.bucketmapjoin:
set hive.optimize.bucketmapjoin=true;
If the bucketed tables actually have the same number of buckets and the data is sorted
by the join/bucket keys, then Hive can perform an even faster sort-merge join. Once
again, properties must be set to enable the optimization:
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set hive.optimize.bucketmapjoin=true;
set hive.optimize.bucketmapjoin.sortedmerge=true;
DISTRIBUTE BY with SORT BY:
ISTRIBUTE BY controls how map output is divided among reducers. All data that flows
through a MapReduce job is organized into key-value pairs. Hive must use this feature
internally when it converts your queries to MapReduce jobs.
By default, MapReduce computes a hash on the keys output by mappers and tries to
evenly distribute the key-value pairs among the available reducers using the hash values.
Unfortunately, this means that when we use SORT BY, the contents of one reducer’s
output will overlap significantly with the output of the other reducers, as far as sorted
order is concerned, even though the data is sorted within each reducer’s output.
hive> SELECT s.ymd, s.symbol, s.price_close
> FROM stocks s
> DISTRIBUTE BY s.symbol
> SORT BY s.symbol ASC, s.ymd ASC;
CLUSTER BY:
In the previous example, the s.symbol column was used in the DISTRIBUTE BY clause,
and the s.symbol and the s.ymd columns in the SORT BY clause. Suppose that the same
columns are used in both clauses and all columns are sorted by ascending order (the
default). In this case, the CLUSTER BY clause is a shor-hand way of expressing the same
query.
For example, let’s modify the previous query to drop sorting by s.ymd and use CLUSTER
BY on s.symbol:
hive> SELECT s.ymd, s.symbol, s.price_close
> FROM stocks s
> CLUSTER BY s.symbol;
2010-02-08 AAPL 194.12
2010-02-05 AAPL 195.46
2010-02-04 AAPL 192.05
2010-02-03 AAPL 199.23
2010-02-02 AAPL 195.86
2010-02-01 AAPL 194.73
2010-01-29 AAPL 192.06
2010-01-28 AAPL 199.29
Casting:
Hive will perform
some implicit conversions, called casts, of numeric data types, as needed. For example,
when doing comparisons between two numbers of different types.
Here we discuss the cast() function that allows you to explicitly convert a value of one
type to another.
Recall our employees table uses a FLOAT for the salary column. Now, imagine for a
moment that STRING was used for that column instead. How could we work with the
values as FLOATS?
The following example casts the values to FLOAT before performing a comparison:
SELECT name, salary FROM employees
WHERE cast(salary AS FLOAT) < 100000.0;
The syntax of the cast function is cast(value AS TYPE). What would happen in the
example if a salary value was not a valid string for a floating-point number? In this
case, Hive returns NULL.
Casting BINARY Values:
The new BINARY type introduced in Hive v0.8.0 only supports casting BINARY to
STRING. However, if you know the value is a number, you can nest cast() invocations,
as in this example where column b is a BINARY column:
SELECT (2.0*cast(cast(b as string) as double)) from src;
Queries that Sample Data
For very large data sets, sometimes you want to work with a representative sample of
a query result, not the whole thing. Hive supports this goal with queries that sample
tables organized into buckets.
In the following example, assume the numbers table has one number column with
values 1−10.
We can sample using the rand() function, which returns a random number. In the first
two queries, two distinct numbers are returned for each query. In the third query, no
results are returned:
hive> SELECT * from numbers TABLESAMPLE(BUCKET 3 OUT OF 10 ON rand()) s;
2
4
hive> SELECT * from numbers TABLESAMPLE(BUCKET 3 OUT OF 10 ON rand()) s;
7
10
hive> SELECT * from numbers TABLESAMPLE(BUCKET 3 OUT OF 10 ON rand()) s;
If we bucket on a column instead of rand(), then identical results are returned on multiple
runs:
hive> SELECT * from numbers TABLESAMPLE(BUCKET 3 OUT OF 10 ON number) s;
2
hive> SELECT * from numbers TABLESAMPLE(BUCKET 5 OUT OF 1
4
hive> SELECT * from numbers TABLESAMPLE(BUCKET 3 OUT OF 10 ON number) s;
2
The denominator in the bucket clause represents the number of buckets into which
data will be hashed. The numerator is the bucket number selected:
hive> SELECT * from numbers TABLESAMPLE(BUCKET 1 OUT OF 2 ON number) s;
2
4
6
8
10
hive> SELECT * from numbers TABLESAMPLE(BUCKET 2 OUT OF 2 ON number) s;
1
3
5
7
9
Block Sampling:
Hive offers another syntax for sampling a percentage of blocks of an input path as an
alternative to sampling based on rows:
hive> SELECT * FROM numbersflat TABLESAMPLE(0.1 PERCENT) s;
Percentage-based sampling offers a variable to control the seed information for blockbased
tuning. Different seeds produce different samples:
<property>
<name>hive.sample.seednumber</name>
<value>0</value>
<description>A number used for percentage sampling. By changing this
number, user will change the subsets of data sampled.</description>
</property>
Input Pruning for Bucket Tables:
From a first look at the TABLESAMPLE syntax, an astute user might come to the conclusion
that the following query would be equivalent to the TABLESAMPLE operation:
hive> SELECT * FROM numbersflat WHERE number % 2 = 0;
2
4
6
8
10
It is true that for most table types, sampling scans through the entire table and selects
every Nth row. However, if the columns specified in the TABLESAMPLE clause match the
columns in the CLUSTERED BY clause, TABLESAMPLE queries only scan the required hash
partitions of the table:
hive> CREATE TABLE numbers_bucketed (number int) CLUSTERED BY (number) INTO 3 BUCKETS;
hive> SET hive.enforce.bucketing=true;
hive> INSERT OVERWRITE TABLE numbers_bucketed SELECT number FROM numbers;
hive> dfs -ls /user/hive/warehouse/mydb.db/numbers_bucketed;
/user/hive/warehouse/mydb.db/numbers_bucketed/000000_0
/user/hive/warehouse/mydb.db/numbers_bucketed/000001_0
/user/hive/warehouse/mydb.db/numbers_bucketed/000002_0
Queries
hive> dfs -cat /user/hive/warehouse/mydb.db/numbers_bucketed/000001_0;
1
7
10
4
Because this table is clustered into three buckets, the following query can be used to
sample only one of the buckets efficiently:
hive> SELECT * FROM numbers_bucketed TABLESAMPLE (BUCKET 2 OUT OF 3 ON NUMBER) s;
1
7
10
4
UNION ALL
UNION ALL combines two or more tables. Each subquery of the union query must produce
the same number of columns, and for each column, its type must match all the
column types in the same position. For example, if the second column is a FLOAT, then
the second column of all the other query results must be a FLOAT.
Here is an example the merges log data:
SELECT log.ymd, log.level, log.message
FROM (
SELECT l1.ymd, l1.level,
l1.message, 'Log1' AS source
FROM log1 l1
UNION ALL
SELECT l2.ymd, l2.level,
l2.message, 'Log2' AS source
FROM log1 l2
) log
SORT BY log.ymd ASC;
UNION may be used when a clause selects from the same source table. Logically, the same
results could be achieved with a single SELECT and WHERE clause. This technique increases
readability by breaking up a long complex WHERE clause into two or more UNION queries.
However, unless the source table is indexed, the query will have to make multiple passes
over the same source data. For example:
FROM (
FROM src SELECT src.key, src.value WHERE src.key < 100
UNION ALL
FROM src SELECT src.* WHERE src.key > 110
) unioninput
INSERT OVERWRITE DIRECTORY '/tmp/union.out' SELECT unioninput.*
kya aap bhi apne partner ko stasify hi kare paate hain toh aaj mein aapko time badhane wale condom kaun se hain uske baare mein btaunga jiske istmaal se aap aapne sex karne ki timing ke saath saath ek safe sex bhi kare payeng
ReplyDelete