Pig Queries Parsing JSON on Amazons Elastic Map Reduce Using S3 Data

I know the title of this post is a mouthful, but it’s the fun of pushing envelope of existing technologies. What I am looking to do is take my log data stored on S3 (which is in compressed JSON format) and run queries against it. In order to not have to learn everything about setting up Hadoop and still have the ability to leverage the power of Hadoop’s distributed data processing framework and not have to learn how to write map reduce jobs and … (this could go on for a while so I’ll just stop here). For all these reasons, I choose to use Amazon’s Elastic Map infrastructure and Pig.

Describing all these technologies is beyond the scope of this article. I will talk you through how I was able to do all this with a little help from the Pig community and a lot of late nights. I will also provide an example Pig script detailing a little about how I deal with my logs (which are admittedly slightly abnormal). I will also be making some assumptions here. Each time I make a large assumption, I will let you know.

First off, I am going to assume that you have an Amazon Web Services account (AWS) and you have also signed up for Elastic Map Reduce (EMR). For all this, I followed the instructions in this video by Ian @ AWS to get me going. Now SSH into the machine so we can get started.

As of the time of this writing, EMR is using Hadoop 0.20 and Pig 0.6. Everything I am going to talk about is for Pig 0.6. With any luck, upgrades to Pig will have taken a lot of this into account. Once you are on the EMR master host, type the following commands to get elephant-bird downloaded. We are going to use it to build a jar that will parse our JSON (big thanks to Dmitriy for all the help here). Note: The reason we are pulling with wget as opposed to git directly is that we want the jsonloader branch and this is just easier.

$ mkdir git && mkdir pig-jars
$ cd git && wget --no-check-certificate https://github.com/kevinweil/elephant-bird/tarball/eb1.2.1_with_jsonloader
$ tar xzf eb.1.2.1_with_jsonloader && cd elephant-bird
$ cp lib/google-collect-1.0.jar ~/pig-jars && cp lib/json-simple-1.1.jar ~/pig-jars
$ ant nonothing
$ cd build/classes
$ jar -cf ../elephant-bird-1.2.1-SNAPSHOT.jar com
$ cp ../*.jar ~/pig-jars

At this point we should have 3 jars in the pig-jars directory. I created an S3 bucket for myself and put the jars in there so I only have to do the compilation once. From here on in, I will be referencing those jars using my s3 bucket. If you like how my logs are organized, then I strongly recommend checking out Cloudera Flume for log aggregation. <shameless plug>I also wrote a blog post here on getting it going.</shameless plug>

Another item worthy of note is that I store all my logs in gzip format. Although this isn’t the best format for Hadoop in the long run because it can’t be split into chunks, it’s what I used. I had trouble getting everything going because Pig doesn’t decompress files when running in local mode. Please learn from that mistake of mine.

Now let’s get into the code a little. First thing we do is register all the jars necessary to parse JSON. This is done using our jars that we put directly on S3. Then we load up the JSON into maps. This is done using the JsonLoader(). You have to use the full path to it (which is listed in the code sample). Now the “interesting” thing about my log files is that they have 3 distinct types of log lines in them. Types i,b, and c. Each log line type has a different meaning so I sort them into 3 groups with the SPLIT command and some conditionals.

Now that I have my data broken out into the 3 buckets, I can start doing what I want with them. Let’s say that in log type i, there is a widget_value. And that widget_value is a string of any number. To show the top 5 values of that widget_value has, I just pull out all instances of widget_value in i. Then I iterate over those values and group them together by type (thus getting aggregate values). And finally I sort them in descending order and show only the top 5.

-- REGISTER the parsing jars
REGISTER s3://$bucket/jars/PIG/google-collect-1.0.jar;
REGISTER s3://$bucket/jars/PIG/json-simple-1.1.jar;
REGISTER s3://$bucket/jars/PIG/elephant-bird-1.2.1-SNAPSHOT.jar;                                  

-- Load up the JSON and split it into the three log types: b, c and i
json = LOAD 's3://$bucket/logs/2011/02/22/1800/serverlog.*' USING com.twitter.elephantbird.PIG.LOAD.JsonLoader();
SPLIT json INTO i IF (FLOAT) $0#'amount' > 0, c IF $0#'id' IS NOT NULL, b IF $0#'response' IS NOT NULL;

wv_i_only = FOREACH i GENERATE (CHARARRAY) $0#'widget_value' AS wv;
wv_i_count = FOREACH (GROUP wv_i_only BY $0) GENERATE $0, COUNT($1) AS i_cnt;
wv_i_sorted_count = LIMIT(ORDER wv_i_count BY i_cnt DESC) 5;
DUMP wv_i_sorted_count

Last thing I want to share is some tips on getting everything going with Pig:

  • Start small and continue small until everything is working
  • Use subsets of your data that you have a good idea of what the results are going to be before you run your queries
  • Step through you queries to ensure each step is doing what you think it’s doing
  • Cast your data types to avoid weird behaviors. Map doesn’t always leave your variables in the type you want/expect

And I can’t forget to say thanks for all the help to the people who hang out in #hadoop-pig on irc.freenode.net.

Posted in Hadoop. Tags: , , . 13 Comments »
  • Pingback: Tweets that mention Pig Queries Parsing JSON on Amazons Elastic Map Reduce Using S3 Data | Erics Tech Blog -- Topsy.com

  • http://inkdroid.org/ Ed Summers

    It that really supposed to read:

    json = LOAD ‘s3://$bucket/logs/2011/02/22/1800/serverlog.*’ USING com.twitter.elephantbird.PIG.LOAD.JsonLoader();

    I get this error:

    2011-03-06 12:00:01,857 [main] ERROR org.apache.pig.tools.grunt.Grunt – ERROR 1070: Could not resolve com.twitter.elephantbird.PIG.LOAD.JsonLoader using imports: [, org.apache.pig.builtin., org.apache.pig.impl.builtin.]
    Details at logfile: /home/ed/Apps/hadoop-0.20.2/pig_1299429241643.log

  • http://eric.lubow.org Eric Lubow

    Nope, it should read:
    json = LOAD ‘s3://$bucket/logs/2011/02/22/1800/serverlog.*’ USING com.twitter.elephantbird.pig.load.JsonLoader(); — Note the lowercase ‘pig’ and ‘load’

    It was capitalized by the syntax highlighter because those are typically keywords. Since Java is case sensitive, you received an import error.

  • http://inkdroid.org/ Ed Summers

    Thanks. That’s what I figured. Unfortunately, when I load with caps removed on the package name I get a java.lang.IncompatibleClassChangeError in my pig log:

    Pig Stack Trace
    —————
    ERROR 2998: Unhandled internal error. Implementing class

    java.lang.IncompatibleClassChangeError: Implementing class
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:634)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
    at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:634)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:277)
    at java.net.URLClassLoader.access$000(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:212)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:205)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:321)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:266)
    at com.twitter.elephantbird.pig.load.JsonLoader.(Unknown Source)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:264)
    at org.apache.pig.impl.PigContext.resolveClassName(PigContext.java:426)
    at org.apache.pig.impl.PigContext.instantiateFuncFromSpec(PigContext.java:456)

    at org.apache.pig.impl.logicalLayer.parser.QueryParser.NonEvalFuncSpec(QueryParser.java:5594)
    at org.apache.pig.impl.logicalLayer.parser.QueryParser.LoadClause(QueryParser.java:1577)
    at org.apache.pig.impl.logicalLayer.parser.QueryParser.BaseExpr(QueryParser.java:1366)
    at org.apache.pig.impl.logicalLayer.parser.QueryParser.Expr(QueryParser.java:1013)
    at org.apache.pig.impl.logicalLayer.parser.QueryParser.Parse(QueryParser.java:800)
    at org.apache.pig.impl.logicalLayer.LogicalPlanBuilder.parse(LogicalPlanBuilder.java:63)
    at org.apache.pig.PigServer$Graph.parseQuery(PigServer.java:1601)
    at org.apache.pig.PigServer$Graph.registerQuery(PigServer.java:1551)
    at org.apache.pig.PigServer.registerQuery(PigServer.java:523)
    at org.apache.pig.tools.grunt.GruntParser.processPig(GruntParser.java:868)
    at org.apache.pig.tools.pigscript.parser.PigScriptParser.parse(PigScriptParser.java:388)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:165)
    at org.apache.pig.tools.grunt.GruntParser.parseStopOnError(GruntParser.java:141)
    at org.apache.pig.tools.grunt.Grunt.run(Grunt.java:76)
    at org.apache.pig.Main.run(Main.java:465)
    at org.apache.pig.Main.main(Main.java:107)
    ================================================================================

    You haven’t seen that before have you?

  • http://eric.lubow.org Eric Lubow

    I don’t know what that is. The only thing I can suggest is to make sure you are using Pig 0.6 on Amazon EMR and the proper branch of Elephant-Bird. If you are definitely using the right versions of everything and you have REGISTER’d the jar prior to loading the JSON, then drop into #hadoop-pig on irc.freenode.net and see if someone there can help you out.

  • http://inkdroid.org/ Ed Summers

    Oh, I’m on pig v0.8.0 ; maybe that’s the problem?

  • http://profiles.google.com/dharmapu Ravinder Reddy

    This is useful, In Addition you can do this without using S3, Thx for the Blog
    We are using twitter data with Pig Json Loader

  • http://simoncast.blogspot.com Simon Cast

    Nice post. I’ve been meaning to get JsonLoader() going as we store a lot of the data as JSON.

    Would it be possible to compile the JARs on a mac machine and then use them in EMR or do the JAR files need to be compiled on the EMR master to work?

  • Robert Slifka

    Hey man, the code highlighter is having an issue and the code isn’t visible. Would be great to have a look at it :)

  • http://eric.lubow.org Eric Lubow

    Thanks. I forgot I upgraded the plugin and it clobbered my code colorer. I threw it up on Github in case you (or anyone else) may want it. With any luck they will include it in the next release. https://github.com/elubow/geshi-pig

  • Alexander Berezovskiy

    Also you can parse multi-level JSON with https://github.com/a-b/elephant-bird

  • Pingback: Processing mail logs with Elastic MapReduce and Pig | The Agile Radar

  • Andy Powell

    Is there a way to run this with the current (11/2012) EMR Pig version 0.9.1