Geeks With Blogs
Josh Reuben

Overview

Developed by Facebook

HiveQL is a SQL-like framework for data warehousing on top of MapReduce over HDFS.

converts SQL query into a series of jobs for execution on a Hadoop cluster.

Organizes HDFS data into tables - attaching structure.

Schema on Read Versus Schema on Write - doesn’t verify the data when it is loaded, but rather when a query is issued.

full-table scans are the norm and a table update is achieved by transforming the data into a new table.

HDFS does not provide in-place file updates - changes from CUD ops are stored in small delta files - periodically merged into base table files by background MapReduce jobs

Hive indexes - speed up queries. 2 index types: compact and bitmap. Pluggable. Compact indexes store HDFS block numbers of each value, rather than each file offset. Bitmap indexes use compressed bitsets to efficiently store rows that a particular value appears in.

Metadata (eg table schema, views) - stored in metastore DB


An Example

create table with 3 typed columns: each row in data file is tab-delimited text.

CREATE TABLE tableX (age STRING, price INT, category INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t';

populate Hive with the data.

LOAD DATA LOCAL INPATH 'path/to/blah.txt'
OVERWRITE INTO TABLE tableX;

OVERWRITE keyword in LOAD DATA statement - delete any existing files in the directory for the table, instead of appended

Now that data is in Hive, can run a query against it:

SELECT age, MAX(price) FROM tableX
WHERE price != 1000 AND category IN (0, 1, 4)
GROUP BY age;

The Hive Shell

launch Hive HiveQL shell:

% hive

hive>

Commands are terminated with a semicolon

SHOW TABLES;

noninteractive mode -f run a script

hive -f script.q

-e option - specify commands inline:

% echo 'X' > /tmp/blah.txt
% hive -e "CREATE TABLE blah (value STRING); \
  LOAD DATA LOCAL INPATH '/tmp/blah.txt' \
  OVERWRITE INTO TABLE blah"

suppress messages using -S option:

% hive -S -e 'SELECT * FROM blah'

run commands on the host OS - using a ! prefix to the command

access HDFS using the dfs command.

Hive DataTypes


Primitive: BOOLEAN TINYINT SMALLINT BIGINT FLOAT DOUBLE DECIMAL STRING VARCHAR CHAR BINARY TIMESTAMP DATE Complex ARRAY STRUCT MAP UNION

Complex - nesting. angled bracket notation:

CREATE TABLE complex (
     x1 ARRAY<INT>,
     x2 MAP<STRING, INT>,
     x3 STRUCT<j:STRING, k:INT, l:DOUBLE>,
     c4 UNIONTYPE<STRING, INT>

);

hive> SELECT x1[0], x2['b'], x3.j, x4 FROM complex;

Operators and Functions

retrieve a list of functions: SHOW FUNCTIONS.

get brief usage instructions for a particular function - DESCRIBE FUNCTION <cmd>

Conversions - perform explicit type conversion using CAST.

Tables

Composed of stored data (typically HDFS, can be S3, local FS) + associated schema metadata (in Metastore RDBMS)

Multiple database/schema support - CREATE DATABASE dbname, USE dbname, DROP DATABASE dbname.

Creating Tables and Loading Data

CREATE TABLE tableX (blah STRING);
LOAD DATA INPATH '/user/joshr/data.txt' INTO TABLE tableX;

Tables are stored as directories under Hive’s warehouse directory: hive.metastore.warehouse.dir defaults to hdfs:///user/hive/warehouse.

loaded local files placed in warehouse directory. fs.defaultFS default is file:/// - local filesystem

load is very fast but does not validate schema mismatch – only checked at query time: query returns NULL for a missing field.

Alternative: external tables: outside warehouse directory:

CREATE EXTERNAL TABLE tableX (colX STRING)
LOCATION '/user/joshr/tableX';
LOAD DATA INPATH '/user/joshr/data.txt' INTO TABLE tableX;

can create the data lazily after creating the table. DROP only deletes metadata.

LOAD DATA - import data into a Hive table (or partition) by copying or moving files to the table’s directory.

populate a table with data from another Hive table using INSERT or at creation time using CREATE TABLE...AS SELECT.

to import data from a relational database directly into Hive, use Sqoop

Altering Tables

Hive flexible schema-on-read approach in permitting a table’s definition to change after the table has been created.

rename a table using ALTER TABLE: moves underlying table directory so that it reflects the new name.

ALTER TABLE source RENAME TO target;

can change definition for columns, add new columns, or replace all existing columns:

ALTER TABLE target ADD COLUMNS (colA STRING);

The new column colA is added after the existing (nonpartition) columns. The datafiles are not updated, so queries will return null for all values of colA

Hive does not permit updating existing tableX – need to update underlying files by another mechanism → more efficient to create a new table that defines new columns and populates them using a SELECT statement.

Changing a column’s metadata, such as a column’s name or data type, is more straightforward, assuming that the old data type can be interpreted as the new data type.

LIKE - create a new, empty table with the same schema as another table:

CREATE TABLE tableY LIKE tableX;

Dropping Tables

DROP TABLE - delete data + metadata. For external tables, only metadata is deleted

TRUNCATE TABLE - delete all the data in a table but keep the table definition( doesn’t work for external tables - use dfs -rmr from the Hive shell)

TRUNCATE TABLE tableX;

Partitions and Buckets

Partition columns subdivided into buckets. A table may be partitioned in multiple dimensions. - efficient queries by location. Partitions are defined at table creation time using PARTITIONED BY clause:

CREATE TABLE tableX (timestamp BIGINT, line STRING)
PARTITIONED BY (datetime STRING, category STRING);

When loading data into a partitioned table, partition values are specified explicitly:

LOAD DATA LOCAL INPATH 'path/to/file'
INTO TABLE tableX
PARTITION (datetime='2015-01-01', category='A');

At the filesystem level, partitions are nested subdirectories of the table directory:

/user/hive/warehouse/tableX
├── datetime=2015-01-01/
│   ├── category=A/

│  │ ├── file1

│  │ └── file2

   │   └── category=B/
   │       └── file3
   └── datetime=2015-01-02/
       ├── category=A/
       │   └── file4
       └── category=B/
           ├── file5
           └── file6

SHOW PARTITIONS:

SHOW PARTITIONS logs;
   datetime=2015-01-01/category=A
   datetime=2015-01-01/category=B
   datetime=2015-01-02/category=A
   datetime=2015-01-02/category=B

use partition columns in SELECT statements - Hive performs input pruning to scan only relevant partitions - enable more efficient queries. Bucketing imposes extra structure on table to support map-side join, efficient sampling queries of dataset subset

specify columns to bucket on and number of buckets:

 CREATE TABLE tableX (id INT, name STRING)
 CLUSTERED BY (id) INTO 5 BUCKETS;

Physically, each bucket is just a file in the table (or partition) directory.

map-side join: if two tables are bucketed in the same way need only retrieve relevant bucket Sorting data within a bucket - becomes an efficient merge sort.

CREATE TABLE tableX (id INT, name STRING)
CLUSTERED BY (id) SORTED BY (id ASC) INTO 5 BUCKETS;

sample a table using TABLESAMPLE clause:

SELECT * FROM tableX
TABLESAMPLE(BUCKET 1 OUT OF 5 ON id);

Storage Formats

file format - dictates the container format for fields in a row.

row format - how rows, and fields in a row, are stored, defined by a SerDe (Serializer- Deserializer)

When querying / inserting a table, a SerDe will deserialize / serialize a row of data between file bytes & Hive internal objects

The simplest format is a plain-text file - row-oriented / column-oriented binary formats have advantages over this.

Delimited plain-text files (default)

a table created with no ROW FORMAT or STORED AS clauses, - default format is delimited text with one row per line.

Default delimeters:

  • row ^A -not tab char

  • collection item - ^B - delimit items in an ARRAY or STRUCT, or in key-value pairs in a MAP.

  • map key - ^C - delimit key and value in a MAP.

  • Rows in a table - newline char.

For an array of arrays, delimiters for outer array are ^B chars, but for inner array are ^C chars,

check which delimiters are used for a particular nested structure:

CREATE TABLE tableX AS
SELECT array(array(1, 2), array(3, 4)) FROM tableY;

and then use hexdump on output file.

Hive supports 8 levels of delimiters, corresponding to ASCII codes 1-8

CREATE TABLE ...
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;

octal form of delimiter characters can be used — 001 for ^A.

Internally, Hive uses a SerDe called LazySimpleSerDe for delimited format + line-oriented MapReduce text input and output formats - not compact , but simple.


Binary storage formats

STORED AS clause in CREATE TABLE statement - ROW FORMAT is not specified, since the format is controlled by the underlying binary file format. Native support for SEQUENCE / AVRO / PARQUET / RCF / ORC

row-oriented formats (Sequence, Avro ):

SET hive.exec.compress.output=true;
SET avro.output.codec=snappy;
CREATE TABLE tableX STORED AS AVRO;

column-oriented formats (Parquet, RCFile, ORCFile)

CREATE TABLE tableX STORED AS PARQUET
AS SELECT * FROM tableY;


Storage handlers

Used for storage systems that Hive cannot access natively, such as HBase. specified using a STORED BY clause, instead of the ROW FORMAT and STORED AS clauses.


Inserts

add new rows in bulk to an existing table by using INSERT INTO to add new data files to a table.

INSERT OVERWRITE TABLE target
SELECT colA, colB FROM source; 

For partitioned tables:

INSERT OVERWRITE TABLE target
PARTITION (datetime='2015-01-01')
SELECT colA, colB FROM source; 

to add tableX - use INSERT INTO TABLE.

specify the partition dynamically: dynamic partition insert.

INSERT OVERWRITE TABLE target
PARTITION (datetime)
SELECT colA, colB, datetime FROM source;

Multitable insert - invert INSERT statement with FROM clause as outer:

FROM source
INSERT OVERWRITE TABLE tableX
SELECT colA, colB

INSERT OVERWRITE TABLE tableY

SELECT colC, colD;

possible to have multiple INSERT clauses in same query. more efficient than multiple INSERT statements because source table needs to be scanned only once to produce multiple disjoint outputs.


CREATE TABLE...AS SELECT

convenient to store output of a Hive query in a new table --> further processing steps

Sorting and Aggregating

ORDER BY -performs a parallel global sort

SORT BY - produces a sorted file per reducer

DISTRIBUTE BY - control which reducer a particular row goes to for some subsequent aggregation

CLUSTER BY - shorthand for specifying both If columns for SORT BY & DISTRIBUTE BY are the same

FROM tableX SELECT age, price
DISTRIBUTE BY age SORT BY age ASC, price DESC;

MapReduce External Scripts

TRANSFORM, MAP, and REDUCE clauses - invoke an external script or program from Hive.

use a script as follows:

ADD FILE /~/is_good_category.py; 		#register script with Hive
FROM tableX
SELECT TRANSFORM(age, price, category)
USING 'is_good_category.py' AS age, price;


register the script with Hive --> ships the file to the Hadoop cluster (Distributed Cache).

input fields are passed as a tab-separated string

MAP and REDUCE keywords – alternate nested form for the query:

   FROM (
     FROM tableX
     MAP age, price, category
     USING 'is_good_category.py'
     AS age, price) map_output
   REDUCE age, price
   USING 'max_price_reduce.py'
   AS age, price;

Joins

SELECT tableX.*, tableY.* FROM tableX JOIN tableY ON (tableX.id = tableY.id);

Hive only supports equijoins - only category can be used in the join predicate

join on multiple columns in the join predicate by specifying a series of expressions, separated by AND keyword.

alternative syntax: list join tables in FROM clause and specify join condition in WHERE clause:

SELECT tableX.*, tableY.* FROM tableX, tableY WHERE tableX.id = tableY.id;

join multiple tables by supplying additional JOIN...ON... clauses – query optimizer: A single join is implemented as a single MapReduce job, but multiple joins can be performed in less than one MapReduce job per join if the same column is used in the join condition - see how many MapReduce jobs Hive will use by prefixing query with EXPLAIN keyword: query execution plan shows AST + stage dependency graph. Verbose: EXPLAIN EXTENDED. rule-based query optimizer replaced by cost-based optimizer

EXPLAIN SELECT tableX.*, tableY.* FROM tableX JOIN tableY ON (tableX.id = tableY.id);

Outer joins - find nonmatches as NULLs: LEFT / RIGHT / FULL OUTER JOIN:

Semi joins: Alternative syntax for WHERE x IN (subquery). Note: subquery columns cannot appear in parent query SELECT.

SELECT * FROM tableY WHERE tableY.id IN (SELECT id from tableX);
// alternative syntax:
SELECT * FROM tableY LEFT SEMI JOIN tableX ON (tableX.id = tableY.id);

Map Joins - Hive can load small tables in memory - take advantage of bucketed tables:

SET hive.optimize.bucketmapjoin=true;

User-Defined Functions

plug in processing code and invoke it from a Hive query.

UDFs have to be written in Java, For other languages: use SELECT TRANSFORM to stream data through a user-defined MapReduce script

3 types of UDF in Hive:

  • UDF -operates on a single row→ single single row

  • UDAF -aggregate : operates on multiple input rows → single output row.

  • UDTF -table: operates on a single row and produces multiple rows — a table:

CREATE TABLE tableX (colA ARRAY<INT>)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002';

the explode UDTF emits a row for each entry in an array:

SELECT explode(colA) AS colB FROM tableX;

Configuration

hive-site.xml

override with --config option in hive command:

% hive --config /Users/joshr/dev/hive-conf

set properties on a per-session basis, by passing the -hiveconf option to the hive command.

   % hive -hiveconf fs.defaultFS=hdfs://localhost \
     -hiveconf mapreduce.framework.name=yarn \
     -hiveconf yarn.resourcemanager.address=localhost:8032

change settings from within a session using SET command

hive> SET hive.enforce.bucketing=true;

SET -v - list all properties in the system

need to explicitly enable transactions ,table / partition-level locking.

Locks are managed transparently using ZooKeeper - SHOW LOCKS

Execution engines - Hive was originally written to use MapReduce as its execution engine, also possible to run Hive using Tez, Spark (DAG) engines - avoid replication overhead by writing intermediate output to memory. hive.execution.engine property defaults to mr (for MapReduce):

SET hive.execution.engine=spark;

Logging

  • error log on the local filesystem at ${java.io.tmpdir}/${user.name}/hive.log.

  • Hadoop’s MapReduce task logs are also a useful resource

% hive -hiveconf hive.log.dir='/tmp/${user.name}'

% hive -hiveconf hive.root.logger=DEBUG,console 

Posted on Tuesday, March 22, 2016 5:32 AM OLAP / Data Mining , Parallelism , Cloud , Spark | Back to top


Comments on this post: Hive - HQL query over MapReduce

# Independent Call girls in mumbai
Requesting Gravatar...
such a nice post.
<a href="http://www.mumbaihotcollection.in/>
Left by riya jacob on May 15, 2016 1:16 PM

# re: Hive - HQL query over MapReduce
Requesting Gravatar...
The process is easy to follow. This is something worth learning now. - Bath Planet
Left by David Williams on Dec 26, 2016 11:00 AM

Your comment:
 (will show your gravatar)


Copyright © JoshReuben | Powered by: GeeksWithBlogs.net