Leveraging Nutch with Infinit.e, MongoDB and Elasticsearch

Apr 15, 2013
Christopher Morgan

Overview

At IKANOW as you might already know we are fans of MongoDB and Elasticsearch as they make up very important pieces of our Infinit.e platform. What you might not know is that there are integration points where tools such as Apache Nutch can be integrated with the Infinit.e platform to provide a highly scalable and robust web-crawler component to the architecture.

Apache Nutch provides a solid web crawling solution that can easily snap-in to any backend data environment. In this case is really nice for seeding content to and from the Infinit.e platform which will be the primary focus of this post. Will also cover in examples how to also integrate directly with MongoDB as well as Elasticsearch. If you have not checked out some of the source you can find it here on Github.

The intent of this article is to be a primer for anyone wanting to integrate Apache Nutch into an alternative native input and output environment. This article is assuming a development familiarity with Nutch but if you are just getting started with the technology check out this tutorial.

Reasons why you might want to integrate Apache Nutch with Infinit.e

Infinit.e offers a platform that provides data enrichment services and a pipeline for processing custom analytics. It requires that data be added to the environment. Apache Nutch can be as source provider for information that can be added near real-time for many different types of cases where open web data is needed.

  • Open Source Analysis
  • Social Media Analysis
  • Financial Services Analysis
  • Product Analysis
  • Many more applications

Getting Dirty with Apache Nutch and its Source

Two topics we will discuss in this section are Parsing and Indexing. For the purposes of this article Parsing is the concept that is used to seed the Apache Nutch crawl application with URLs that are to be crawled. Indexing the the process of adding the crawled documents into a database environment for later retrieval. Lets first discuss the concept of Parsing.

If you are already familiar with these processes please skip down to next section in this post.

Parsing

Lets start by understanding the application that is going to used to parse and crawl your listing of URLs. This application is Nutch is called “crawl” (pretty good name!) and it allows you to do a few different things but first we need to create a directory and file for the listing of URLs that we want to Nutch to crawl.

Creating seed file

mkdir -p urlsnano seed.txt

Now add a few URLs to your seed.txt file. These can be one per line in the text file

Adding a few URLs to your seed file

http://www.ikanow.com
http://developer.www.ikanow.com
https://www.github.com/ikanow

Now we are ready to crawl some data but before we do here a brief explanation of the parameters that can be used in the “crawl” app.

  • -dir dir names the directory to put the crawl in.
  • -threads threads determines the number of threads that will fetch in parallel.
  • -depth depth indicates the link depth from the root page that should be crawled.
  • -topN N determines the maximum number of pages that will be retrieved at each level up to the depth.

Adding a few URLs to your seed file

bin/nutch crawl urls -dir crawl -depth 3 -topN 5

After you run this command you should be able to see a few new directories that have been created.

Directories that get created

crawl/crawldb
crawl/linkdb
crawl/segments

These files contain the results of your crawl. Now we are ready and primed to move to indexing and searching of the crawled information.

Indexing

Nutch out of the box is setup to integrate directly with Apache Solr and beneath it Apache Lucene. For the purposes of this article we are going to assume you already have a Solr Core up and running since we are really going to spending our time focused on putting data into Elasticsearch, MongoDB and Infinit.e

Indexing directly to Solr

bin/nutch crawl urls -solr http://localhost:8983/solr/ -depth 3 -topN 5

So now you have seen taking a list of urls that have been seeded into Apache Nutch and have been made available in Apache Solr for search and retrieval.

Now for the hard part (Getting your Development Environment up and Running)

The following sections focus on integrating new data environment with Apache Nutch to seed URLs and store the information processed by Apache. This will require us to actually modify the Apache Nutch source code and have a development environment set up do so. Lucky for all this is covered in the Apache Nutch documentation ( Run Nutch In Eclipse ). For the purposes of this article we are working with Apache Nutch 1.6. You will need to get a copy of the source code and you can get it here.

Understanding where to integrate your solution in the source

Apache Nutch source code provides two logical place places for integrating the Parsing and the Indexing. For the purposes of this blog post we are only going to focus on indexing.

To become familiar with how to create this logic recommend taking a look at the following files for Indexing.

Configuring Ivy to get your dependencies

In your IVY.XML file you will need to add the dependencies so that you can get the MongoDB and Elasticsearch JAVA drivers.

 

IVY Dependencies

         
         default"/>;
      
         default"/>

Integrating with Elasticssearch

This section provides examples how Elasticsearch can be integrated easily with the Apache Nutch source to Index data. In later blog posts we will discuss the ability to Parse data from Elasticsearch and utilize within Nutch.

To send the data that Nutch crawls to Elasticsearch you have to create both a Indexer and a Writer. The Indexer serves as the job client and call the Writer to perform the function of writing the information to the Elasticsearch repository. Once created this interact the with Nutch as the same as the Solr integration.

Elasticsearch Indexer

public class ElasticsearchIndexer  extends Configured implements Tool {
     public static Log LOG = LogFactory.getLog(ElasticsearchIndexer.class);
       public ElasticsearchIndexer() {
         super(null);
}
public ElasticsearchIndexer(Configuration conf) {
super(conf);
}
public void indexElasticsearch(String elasticsearchUrl, String elasticsearchPort, Path crawlDb, Path linkDb,
List; segments) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("ElasticsearchIndexer: starting at " + sdf.format(start));
final JobConf job = new NutchJob(getConf());
job.setJobName("index-elasticsearch " + elasticsearchUrl);
IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);
job.set(ElasticsearchConstants.SERVER_URL, elasticsearchUrl);
job.set(ElasticsearchConstants.SERVER_PORT, elasticsearchPort);
NutchIndexWriterFactory.addClassToConf(job, ElasticsearchWriter.class);
job.setReduceSpeculativeExecution(false);
final Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-" +
new Random().nextInt());
FileOutputFormat.setOutputPath(job, tmp);
try {
// run the job and write the records to infinite (this will be done via the rest api
JobClient.runJob(job);
long end = System.currentTimeMillis();
LOG.info("ElasticsearchIndexer: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}
catch (Exception e){
LOG.error(e);
} finally {
FileSystem.get(job).delete(tmp, true);
}
}
public int run(String[] args) throws Exception {
if (args.length < 5) {
System.err.println("Usage: ElasticsearchIndexer <elasticsearch url> <elasticsearch port> <crawldb> <linkdb> (<segment> ... | -dir <segments>)");
return -1;
}
final Path crawlDb = new Path(args[2]);
final Path linkDb = new Path(args[3]);
final List<Path> segments = new ArrayList<Path>();
for (int i = 4; i < args.length; i++) {
if (args[i].equals("-dir")) {
Path dir = new Path(args[++i]);
FileSystem fs = dir.getFileSystem(getConf());
FileStatus[] fstats = fs.listStatus(dir,
HadoopFSUtil.getPassDirectoriesFilter(fs));
Path[] files = HadoopFSUtil.getPaths(fstats);
for (Path p : files) {
segments.add(p);
}
} else {
segments.add(new Path(args[i]));
}
}
try {
indexElasticsearch(args[0], args[1], crawlDb, linkDb, segments);
return 0;
} catch (final Exception e) {
LOG.fatal("ElasticsearchIndexer: " + StringUtils.stringifyException(e));
return -1;
}
}
// ./bin/nutch org.apache.nutch.indexer.mongodb.ElasticSearchIndexer localhost 9000 crawldb crawldb/linkdb crawldb/segments/*
public static void main(String[] args) throws Exception {
final int res = ToolRunner.run(NutchConfiguration.create(), new ElasticsearchIndexer(), args);
System.exit(res);
}
}

The purpose of the Writer is take the information that Nutch has collected and write that information into in this case Elasticsearch. You can see how you can make this as complex or more robust for your required solutions.

Elasticsearch Writer

public class ElasticsearchWriter  implements NutchIndexWriter{
private Client client;
private Node node;
@Override
public void open(JobConf job, String name) throws IOException {
String url = job.get(ElasticsearchConstants.SERVER_URL);
int port = Integer.parseInt(job.get(ElasticsearchConstants.SERVER_PORT));
// Need to do this if the cluster name is changed, probably need to set this and sniff the cluster
/* Settings settings = ImmutableSettings.settingsBuilder()
.put("cluster.name", "myClusterName").build()
.put("client.transport.sniff", true).build();*/
client = new TransportClient()
.addTransportAddress(new InetSocketTransportAddress(url, port));
//node = nodeBuilder().client(true).node();
//client = node.client();
}
@Override
public void write(NutchDocument doc) throws IOException {
// Set up the es index response
String uuid = UUID.randomUUID().toString();
IndexRequestBuilder response = client.prepareIndex("nutch", "index", uuid);
Map<String,Object> mp = new HashMap<String, Object>();
for(final Entry<String, NutchField> e : doc) {
for (final Object val : e.getValue().getValues()) {
String key;
// normalise the string representation for a Date
Object val2 = val;
if (val instanceof Date){
key = e.getKey();
val2 = DateUtil.getThreadLocalDateFormat().format(val);
mp.put(key, val2);
} else {
key = e.getKey();
mp.put(key, val);
}
}
}
// insert the document into elasticsearch
response.setSource(mp);
response.execute();
}
@Override
public void close() throws IOException {
client.close();
//node.close();
}
}

To run the above code you must have compiled and ensure that Nutch is up and running or you can run in debug mode in Eclipse.

Run

./bin/nutch org.apache.nutch.indexer.mongodb.ElasticSearchIndexer localhost 9000 crawldb crawldb/linkdb crawldb/segments/*

Integrating with MongoDB

This section provides examples how MongoDB can be integrated easily with the Apache Nutch source to Index data. In later blog posts we will discuss the ability to Parse data from Elasticsearch and utilize within Nutch. Much of the code is the same so will only really cover the primary differences in the logic in the Writer. This assumes you are including the MongoDB Java driver

MongoDB Writer

public class MongodbWriter  implements NutchIndexWriter{
private Mongo mongo;
@Override
public void open(JobConf job, String name) throws IOException {
mongo = new Mongo(job.get(MongodbConstants.SERVER_URL));
}
@Override
public void write(NutchDocument doc) throws IOException {
// Connect to a mongodb database
DB db = mongo.getDB( "nutch" );
DBCollection col = db.getCollection("index");
// Setup the mongodb db object
BasicDBObject mongoDoc = new BasicDBObject();
for(final Entry<String, NutchField>; e : doc) {
for (final Object val : e.getValue().getValues()) {
String key;
// normalise the string representation for a Date
Object val2 = val;
if (val instanceof Date){
key = e.getKey();
val2 = DateUtil.getThreadLocalDateFormat().format(val);
mongoDoc.put(key, val2);
} else {
key = e.getKey();
mongoDoc.put(key, val);
}
}
}
// insert the document into mongodb
col.insert(mongoDoc);
}
@Override
public void close() throws IOException {
if ( mongo != null ) {
mongo.close();
mongo = null;
}
}

This writes the information directly into a MongoDB database named “nutch” into a collection named “index”. To run the above code you must have compiled and ensure that Nutch is up and running or you can run in debug mode in Eclipse.

Run

./bin/nutch org.apache.nutch.indexer.mongodb.MongoDbIndexer localhost 9000 crawldb crawldb/linkdb crawldb/segments/*

Integrating with Infinit.e

MongoDB Writer

public class InfiniteWriter implements NutchIndexWriter{
Private String apiRoot;
Private String communityId;
@Override
public void open(JobConf job, String name) throws IOException {
communityId = job.get(InfiniteConstants.COMMUNITY_ID);
apiRoot = job.get(InfiniteConstants.API_ROOT);
apiRoot += "api/config/source/save/" + communityId // + "?json={...}";
}
@Override
public void write(NutchDocument doc) throws IOException {
forfinal Entry<String, NutchField>; e : doc) {
for (final Object val : e.getValue().getValues()) {
String key = e.getKey();
apiRoot += "?json=" + // SEE https://ikanow.jira.com/wiki/display/INFAPI/Config+-+Source+-+Save
// USING THIS FOR LOOP YOU WOULD PICK OUT THE PARAMETERS YOU NEED AND THEN RUN THE REQUEST
}
try {
// instantiate the URL object with the target URL of the resource to request
URL url = new URL(apiRoot);
// instantiate the HttpURLConnection with the URL object - A new connection is opened every time by calling the openConnection method of the protocol handler for this URL.
// 1. This is the point where the connection is opened.
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
// set connection output to true
connection.setDoOutput(true);
// instead of a POST, we're going to send using method="GET"
connection.setRequestMethod("GET");
// if there is a response code AND that response code is 200 OK, do stuff in the first if block
if (connection.getResponseCode() == HttpURLConnection.HTTP_OK) {
// OK
System.out.println(connection.getResponseCode());
System.out.println(connection.getResponseMessage());
// otherwise, if any other status code is returned, or no status code is returned, do stuff in the else block
} else {
// Server returned HTTP error code.
System.out.println(connection.getResponseCode());
}
} catch (MalformedURLException e) {
// ...
} catch (IOException e) {
// ...
}
}
// insert the document into mongodb
col.insert(mongoDoc);
}
@Override
public void close() throws IOException {
if ( mongo != null ) {
mongo.close();
mongo = null;
}
}

More to Come

In following posts, we will explain how one can Parse data from these open source tools and how they can begin to scale these examples into a larger production implementation (stay tuned).

About the Author: Chris Morgan

Chris is the President of IKANOW and works on strategy, core operations, customer engagements, product line expansion, sales and, prototyping software. Chris has 11 years of experience in project management, systems engineering design, software engineering design and development. In addition, he has extensive experience in operationalizing technology solutions into products that are both robust and practical.

Learn more

 

No comments

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>