Sunday, September 18, 2016

Concepts about Storm you need to know [Part 2]

There are some concepts that are always good to know when you are tearing your hair over the working of Storm.


Yaml correctness:

Storm's config files use yaml to define properties, so if you get an error like this when modifying zookeeper servers:

Exception in thread "main" java.lang.ExceptionInInitializerError
    at org.apache.storm.config$read_storm_config.invoke(config.clj:78)
    at org.apache.storm.command.list$_main.invoke(list.clj:22)
    at clojure.lang.AFn.applyToHelper(AFn.java:152)
    at clojure.lang.AFn.applyTo(AFn.java:144)
    at org.apache.storm.command.list.main(Unknown Source)
Caused by: while scanning a simple key
 in 'reader', line 2, column 1:
    storm.zookeeper.port:2181
    ^
could not found expected ':'
 in 'reader', line 3, column 1:
    nimbus.host: "localhost"
    ^
caused when
storm.zookeeper.servers: - "127.0.0.1"
nimbus.seeds: ["127.0.0.1"]
It's because you didn't conform to the yaml syntax. You can validate it at http://www.yamllint.com/ or any other yaml validator website.


Logviewer:
When viewing Storm results from the Storm UI (especially from localhost), you are allowed to click on the port numbers (which are hyperlinks) and you'll be able to see the contents of the log files to which storm logs output.

This works only if you have started the logviewer. This is basically just a storm process you have to start.
Goto the storm bin folder and type ./storm logviewer


Storm's slots:
Storm, by default provides 4 "slots" on each node that you run it on. Why only 4 ports?
A slot is used by a worker. A worker is a JVM. So each JVM would require a clump of heap memory of its own. So a default of 4 slots would use 4*x amount of memory, where x is the memory used by a worker JVM.
Now obviously if you declare more than 4 ports, it'll take up that much more memory. The problem with taking up too much memory, is that your topologies will suddenly crash with a GC overhead limit exceeded exception and the spout or bolt will get re-started constantly. As I understand, you'd be better off with increasing the number of servers or RAM on the existing server, if you want to have many workers/topologies.


Setting the amount of memory used by a topology:
From what I know, you can figure out how much memory your application needs and allocate the memory as shown below.
The below code allocates 2GiB of memory for each worker of the topology.

Config stormConfig = new Config(); stormConfig.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xmx2g");


Fields grouping does not get overwhelmed with data:

We would think that the way Storm keeps track of unique fields for fields grouping is by having a HashMap of the fields and bolt Id's. That's not how it works.

This is the internal Storm code for fields grouping:

public List chooseTasks(int taskId, List values) {
int targetTaskIndex = Math.abs(TupleUtils.listHashCode(outFields.select(groupFields, values))) % numTasks;
return Collections.singletonList(targetTasks.get(targetTaskIndex));
}


TupleUtils.listHashCode leads to
public static int listHashCode(List alist) {
  if (alist == null) {
      return 1;
  } else {
      return Arrays.deepHashCode(alist.toArray());
  }
}


So the function is independent of which Spout emits it. What matters is the field name based on which the fields grouping is done.



More help:


Understanding Fields Grouping in Apache Storm [Part 3: Understanding Fields Grouping]

Fields grouping can be confusing to understand from the examples given on the internet because they all have some strings in the declaration fields which just don't make any sense, given that we are supposed to send tuples based on some "fields".

So basically the concept is this:
  • In shuffle grouping, you send any tuple you want from a spout and Storm will decide which of the numerous bolt tasks it should reach.
  • In fields grouping, you can send the same tuple from the spout and also specify a unique ID based on which Storm can for example, send all tuples with id1 always to bolt3 or send tuples with id4 always to bolt 5 or id6 always to bolt5. Multiple id's can go to the same bolt, but they will continue being sent to the same bolt.
Most examples will show you a tuple as a string and the string itself will be the unique ID based on which fields grouping is done.

But  that does not have to be the case.

You can even send other values as the fields of the tuple and specify one of those fields as the unique ID that Storm should use for doing fields grouping. 

Follow the text in red below.

BasicStorm.java
package com.sdint.basicstorm;

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class BasicStorm {

 public static void main(String[] cmdArgs) {

 Config config = new Config();
 config.put(Config.TOPOLOGY_DEBUG, false);
 config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
 config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);//alters the default 30 second of tuple timeout to 10 second

 final String spoutName = DataSpout.class.getSimpleName();
 final String boltName = ProcessingBolt.class.getSimpleName();
 final Integer numberOfSpouts = 1;
 final Integer numberOfBolts = 5;
 String someRandomNumber = "someRandomNumber";
 String theSpoutId = "theSpoutId";

 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout(spoutName, new DataSpout(), numberOfSpouts)
 .setNumTasks(numberOfSpouts);
 builder.setBolt(boltName, new ProcessingBolt(), numberOfBolts)
 .setNumTasks(numberOfBolts)
 .fieldsGrouping(spoutName, new Fields(someRandomNumber));

 LocalCluster localCluster = new LocalCluster();
 localCluster.submitTopology("BasicStorm", config, builder.createTopology());

 System.out.println("\n\n\nTopology submitted\n\n\n");
 StopWatch.pause(60*3);//pause for x seconds during which the emitting of tuples will happen

 //localCluster.killTopology("BasicStorm");
 localCluster.shutdown();
 }
}


DataSpout.java
package com.sdint.basicstorm;

import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataSpout extends BaseRichSpout {
    private TopologyContext context;
    private SpoutOutputCollector collector;
    private transient Integer spoutId;   
    private transient Integer numberOfEmits;
   
    private final Logger logger = LoggerFactory.getLogger(DataSpout.class);
   
    @Override
    public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
        this.context = tc;
        this.collector = soc;
        this.spoutId = context.getThisTaskId();
        this.numberOfEmits = 0;
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
        String someRandomNumber = "someRandomNumber";
        String theSpoutId = "theSpoutId";
        String aTupleName = "aTupleName";
        ofd.declare(new Fields(aTupleName, someRandomNumber, theSpoutId));
    }

    @Override
    public void nextTuple() {
       
        if (++numberOfEmits < 200) {
            Double randNum = Math.floor(Math.random() * 10);
            SomeTuple tup = new SomeTuple();
            collector.emit(new Values(tup, randNum, spoutId), tup);//(tuple, uniqueID for FieldsGrouping, spoutID), unique messageId
            logger.info("Emitting {}",randNum);
        }
        else {logger.info("NO MORE EMITS");}
       
        StopWatch.pauseMilli(300);
    }
   
    @Override
    public void ack(Object msgId) {
    }   
   
    @Override
    public void fail(Object msgId) {
    }
   
}


ProcessingBolt.java
package com.sdint.basicstorm;

import java.util.HashMap;
import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProcessingBolt extends BaseRichBolt {

    private OutputCollector collector;
    private transient Integer boltId;
    private final Logger logger = LoggerFactory.getLogger(ProcessingBolt.class);
    private HashMap counter;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
    }

    @Override
    public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
        collector = oc;
        boltId = tc.getThisTaskId();
        counter = new HashMap<>();
    }

    @Override
    public void execute(Tuple tuple) {

        //From the spout, Values(randNum, spoutId) were sent, so we receive it here
        SomeTuple tup = (SomeTuple) tuple.getValue(0);//the first value in the fields
        Double num = (Double) tuple.getValue(1);//the second value sent in the fields
        Integer spoutId = (Integer) tuple.getValue(2);//the third value sent in the fields
        //Similarly, you could have sent more values from the spout and received it as tuple.getValue(3) or (4) etc.
        logger.info("Bolt {} received {} from spout {}", boltId, num, spoutId);
        CountTheNumberOfSimilarNumbersReceived(num);
        DisplayCounts();

        collector.ack(tuple);
    }

    private void CountTheNumberOfSimilarNumbersReceived(final Double num) {
        if (counter.containsKey(num)) {
            Integer existingCount = counter.get(num);
            counter.put(num, ++existingCount);//increment the count for this number
        } else {
            counter.put(num, 0);
        }
    }

    private void DisplayCounts() {
        logger.info("\n\n\n\nBolt {} has", boltId);
        for (Map.Entry oneVal : counter.entrySet()) {
            logger.info("{} number of {}", oneVal.getValue(), oneVal.getKey());
        }
        logger.info("\n\n\n");
    }

}


  • The tuple is the entire bunch of data being emitted. ie: The object that is emitted in the spout (new Values(tup, randNum, spoutId)). That's the same object that reaches the bolt (public void execute(Tuple tuple)).
  • The fields are the objects that formed the tuple (SomeTuple object, random number and spout Id)
  • Grouping of tuples (sending them to specific bolts) is done based only on the random number field because that's what you specified in the line  .fieldsGrouping(spoutName, new Fields(someRandomNumber));

So if random number is 1.0, the tuple might be sent to bolt 5.
If random number is 2.0, the tuple might be sent to bolt 3.
If random number is 2.0 again, the tuple will be sent to bolt 3.

That's all there is to it. If you want, you can even have the random number as the tuple. It could also be a String instead of a number. Any object you want. You can also re-order the fields in the tuple. Just make sure you also have the same order in the declareOutputFields function.

You also need to know that fields grouping does not overwhelm Storm with data.


More on Apache Storm:

Saturday, September 17, 2016

How gravity based water filters purify water


Disclaimer: I am not an expert on chemicals or water filters. What is mentioned in this particular post about chemicals is a perception based on information sourced from the internet.


If you use an RO (Reverse Osmosis) filter and are familiar with the process of osmosis,  then all you need to know is that RO is osmosis in the reverse direction which happens when water pressure is high. The heavy metals and salt in the water is retained within the confines of the membrane and pure water (including chlorine) gets across the membrane.
But gravity based filters aren't like that. They depend on just gravity to go through a few filters and viola! somehow, the water is purified. On asking the company how bacteria and viruses are killed, you'll get at best an ambiguous answer. In-fact, an investigation into Hindustan Unilever's 1 crore viruses kill claim was shown to be exaggerated.


So the first thing you learn about modern water filter companies is:
Don't trust their marketing. Ask extremely detailed questions. If you don't get proper answers, then don't buy the product.

Backstory:
My old water filter stopped functioning, and I had to get a new temporary water filter quickly. Looking at the reviews on Amazon, Hindustan Unilever's Pureit Advanced gravity based water filter seemed ok at that time, because I didn't need to filter borewell water (which would've had metallic impurities). I just needed it to filter the chlorinated corporation water. Ordered it via Pureit's helpline and it was promptly delivered.

After a few days of drinking it, I got an uneasy feeling in my stomach and lost my hunger. It was like the stomach needed time to repair damage it sustained. Oddly, even the construction workers we gave the water to, refused to drink it after a few days.
On reporting it, a Pureit technician came to inspect it, said it might be a problem with the carbon polisher and surprisingly, he himself refused to drink the water saying he's had throat problems after drinking the water at various customers houses. He seemed familiar with the problem. 
After replacing the carbon polisher, the problem continued. This time, a different technician arrived, and he refused to drink the water too. Said he was familiar with the problem, replaced the 'germ-kill' kit and asked me to try using the filter again. Problem continued and I had to ask them to take back the product. They took it back for half the price.


What a gravity-based purifier may rely on:

1. A Germkill processor: This is essentially nothing but bleaching powder. The same white powder used to sanitize swimming pools, is used to disinfect your drinking water. This powder basically brings chlorine into the water which kills microorganisms. Problem is, if your water is already chlorinated by your municipal corporation, then the water purifier is just going to add more chlorine and that can be bad for your stomach.
 
In the case of Pureit, the germkill processor had this chemical in a little container which would slowly dissolve in the water and when it got over, the red piece of plastic at the top would have moved to the bottom and would be visible to the user as a sign that the germkill kit needed replacement.





2. An advanced microfiber mesh: It's nothing but a porous material which allows water through and does not allow dirt through. Nothing advanced about it. Gets dirty quickly, and the technician advised cleaning it every week. So honestly, it does the job, but well...clean it every week is a reality.



3. Carbon polisher: This is a carbon-based fabric supposed to remove pesticides, parasites (really? So the germkill kit doesn't work?) and chlorine, but that also means you need to change your carbon cartridge often because it can't adsorb (not absorb) chemicals forever.



4. Microcharged membrane: It's just another fancy name which claims to remove harmful parasites (really? So the germkill kit and the carbon polisher didn't work yet?) It's just a hollow cylinder with a clump of white wires at the bottom. The technician told me it was to give the water a sweet taste.

outside
inside

So that's the process. Chlorinate the water, remove dirt, remove chlorine, add a sweet taste. This is claimed to be "better than boiling", but it'd take a very gullible person to believe that.

The most visible and alarming indicator was that their own technicians were refusing to drink the "purified" water. I wrote to the company requesting them to do further checks and if necessary, initiate a product recall in the interest of public safety.

As for you, if you get municipal corporation water, boiling it would be a much more reliable way to get rid of bacteria. Although boiling doesn't get rid of all pathogens, it does reduce it to a safe level. An acquaintance who worked for a water purifier company informed me that it is the old water filters that used to be much better and more reliable than the modern filters. The modern ones bank on getting money out of you by having annual maintenance contracts and having components like the germkill kit and carbon polisher which need frequent replacement.

We need a better solution than relying on unethical corporations for a basic need such as clean drinking water.



Saturday, September 10, 2016

Aha!



Continued from the previous Aha!


Cashflow
Share with this link





Will be continued...

Saturday, August 13, 2016

How to network at conferences

I came across a post by an entrepreneur on Office Chai about why he stopped attending startup conferences.
He says:
"And it sure was fun. In the beginning, the a-ha moment used to be spotting a celebrity. “I’m in the same room as Sachin Bansal!”....
And I wasn’t my usual self at these conferences. I’m an engineer, and a bit of an introvert, but a switch used to flick on in my head as soon as I had a conference badge around my neck. I’d be approaching people, exchanging business cards, and talking to anyone who’d listen about my startup.
Except that it never went anywhere

and then...

"It took me two years (and lots of money in conference fees) to figure out what the problem was. Conferences are horribly artificial ways to meet people. Everyone is trying to meet as many people in as little time as possible, and the result is that interactions are superficial and hackneyed. Real relationship-building time and effort. By trying to meet everyone, you end up meeting no one."


What this entrepreneur mentioned here is one of the classic misconceptions people have when they attend conferences. What I'm going to write below, is not my wisdom, but something I've read about from a source I don't recollect (I think it was Rajesh Shetty's blog). But the words of advice remained in my mind and I've been practicing it too.

Here's what you need to remember:

Everyone who attends conferences and networking events have just one thing in mind: "What's in it for me?". So when you attend with the same thing in mind, how do you expect others to help you, when they are there trying to figure out how others can help them?
When you go to such events, go with this thought in mind "How can I bring value to someone else?" or "How can I truly make myself or my business useful to someone else?".

When you have this mindset, you'll automatically eliminate the conversations that only help you. You'll start thinking from the other person's perspective and truly evaluating whether you will actually be able to help that person or not. When you see that you can help the person and you explain it, the person will obviously see that there is a point in remaining in touch with you.

The relationship is retained and it grows.

Sometimes you may not see any opportunity to add value to anyone, but you will meet people whom you see you have a natural connect with. When you meet these people repeatedly, a familiarity and friendship does evolve.

Networking and conferences are indeed useful when you stop seeing it from the perspective of "What's in it for me?".

Saturday, August 6, 2016

Aha!

Continued from the previous Aha!


Internet
Share with this link




Continued in the next Aha!


Monday, July 11, 2016

The better way to get date and time on your photos

On one of the latest Sony cameras, I switched on a feature that imprints the date the photograph was taken, onto the bottom right corner of the photo.

I was shocked at the result.

Sony's software puts a fat orange date stamp on the photo without even antialiasing it. Looks like we've been transported to the 1970's.

Searching for a better alternative led me to a script written by Terdon that makes use of imagemagick to extract exif information from the picture.

  • You can run it via the linux commandline
  • It's fast 
  • Doesn't mess up your existing image and 
  • Is also configurable

See the difference between Sony's orange date stamp and imagemagick's white one.



Here's what to do at your bash terminal:

sudo apt-get install imagemagick
sudo apt-get install exiv2
  
Then put this script in a file named watermark.sh:

#!/usr/bin/env bash

## This command will find all image files, if you are using other
## extensions, you can add them: -o "*.foo"

find . -iname "*.jpg" -o -iname "*.jpeg" -o -iname "*.tif" -o \
 -iname "*.tiff" -o -iname "*.png" |

## Go through the results, saving each as $img
while IFS= read -r img; do
 ## Find will return full paths, so an image in the current
 ## directory will be ./foo.jpg and the first dot screws up
 ## bash's pattern matching. Use basename and dirname to extract
 ## the needed information.

 name=$(basename "$img")
 path=$(dirname "$img")
 ext="${name/#*./}";

 ## Check whether this file has exif data
 if exiv2 "$img" 2>&1 | grep timestamp >/dev/null
 ## If it does, read it and add the water mark
 then
 echo "Processing $img...";
 convert "$img" -gravity SouthEast -pointsize 22 -fill white \
 -annotate +30+30 %[exif:DateTimeOriginal] \
 "$path"/"${name/%.*/.time.$ext}";
 ## If the image has no exif data, use the creation date of the
 ## file. CAREFUL: this is the date on which this particular file
 ## was created and it will often not be the same as the date the
 ## photo was taken. This is probably not the desired behaviour so
 ## I have commented it out. To activate, just remove the # from
 ## the beginning of each line.


 # else
 # date=$(stat "$img" | grep Modify | cut -d ' ' -f 2,3 | cut -d ':' -f1,2)
 # convert "$img" -gravity SouthEast -pointsize 22 -fill white \
 # -annotate +30+30 "$date" \
 # "$path"/"${name/%.*/.time.$ext}";

 fi
done


Change the file to an executable and run it:

chmod +x watermark.sh
./watermark.sh

Run it in the folder where your images are placed.

Works like a charm!

Sunday, July 3, 2016

The ingenuity of the humble zipper

"Clasp locker" and "hookless fastener". That's what the zip was called, once-upon-a-time.

See how the clasps get locked. A rather ingenious invention by Whitcomb L. Judson.




The reason I'm writing about it today, is because the zippers of my bag stopped working and I thought I'd have to replace the zipper teeth and the slider.

The person at the shop though, told me I'd only have to change the slider. Turned out that with time, the insides of the slider get worn out and they aren't able to make the zipper teeth clasp together anymore. The teeth are still ok, and you don't have to replace them.

Replacing involves removing some of the stitches that hold the teeth...


 ...removing the old slider completely, replacing it with a new slider and stitching it back up with a sewing machine.


That's it!
This entire process happened in less than 10 minutes, and cost me Rs.30 and Rs.20 for a large and small slider (stitching charges included).


The next time you worry about throwing away your bag or jacket because of a defective zipper, remember this post. You can get it fixed in a jiffy. If possible, try to get a new slider that matches your bag/jacket. The person at the shop might not have a variety of colours.

Friday, July 1, 2016

Discovering the unknown

X-rays were discovered by accident. We have never seen electrons. We don't know if light is a particle or a wave because of properties which observed. We don't see or hear many wavelengths which other insects and animals can.

In order for our senses of sight, smell, touch, taste and hearing to be able to receive this data and for the brain to process it, we needed to convert X-rays into something that we could see. Electrons paths into something we could experiment upon and prove.

We really are very limited by our senses.

But what if we created a machine and an AI that was capable of manufacturing sensors which would detect properties of the Earth and Universe which we never discovered, and then translate that into data that we can understand?

We would be able to not just discover so many more dimensions, we might even be able to re-program our DNA to be able to take advantage of that data.

How, is the question.

Tuesday, June 14, 2016

Aha!

Continued from the previous Aha!


Shoe manufacturing
Share with this link



Continued in the next Aha...


Saturday, May 14, 2016

Aha!

Continued from the previous Aha!


Egotistical vehicles!
Share with this link


Continued in the next Aha
 

Wednesday, May 11, 2016

The 3D hologram on the smartphone actually works!

A colleague of my classmate showed me an article & video of a hologram created on a smartphone, and I didn't quite believe it.



It did make me curious though. Holograms & VR are something I'd love to work on, so I actually tried it out.

Find a CD case or even transparent plastic or glass. Perhaps even a plastic bottle as one person did.


Cut out four pieces of a trapezium from it. Drawing a diamond shape as shown below is by far the most convenient and space-efficient way of doing it. Once you've cut it out on paper, stick it to the plastic with cello-tape and carefully cut the plastic.
 
You end up with this

Super-glue would be a better option for sticking it, but I just used cello-tape. Make sure the 1cm edge part is even, because that's the part you'll be keeping on your smartphone.

Now take your pick from the various videos available.


and watch it come alive!






Turns out there are far more impressive holographic illusions than the smartphone illusion. Play the one below and see.



And all this isn't modern technology. Such illusions have existed since the year 1584. There are plenty of examples scattered across history. More recently, holograms were used in Mr.Narendra Modi's election campaign!

It all began when an Italian scientist named Giambattista della Porta created the illusion of what later came to be known as Pepper's ghost.

This is how the illusion works:


Modern technology comes into play when you want to actually touch the hologram. As of today, that's made possible in a small way with lasers.




Beautiful!



How the smartphone hologram illusion works

In reality, the smartphone hologram is not a hologram at all. It's simply a reflection that can be viewed from all 4 sides. I remember when a person first showed me the video, my first question to him was "what happens if we remove one of the plastic panes?"

While the video is playing, try putting your finger into the middle of the plastic pyramid you created. You'll still be able to see the 'hologram' because your finger isn't blocking anything. In-fact, instead of creating four trapezium's, if you held just one pane of plastic or glass at that angle, facing you, you'd still be able to see the 'hologram'.

When you stand 2 feet in front of a mirror, your mirror image appears to be 2 feet behind the glass. That's the exact same illusion the plastic trapezium creates. At that angle, it just reflects the video, making you think the image is somewhere behind the plastic, in the air. The reflections from the other three panes aren't visible to you because they are at angles that don't reflect or refract the light in your direction. The only way having 4 trapeziums helps, is that you get to see the reflection suspended in the air, in the four directions you view it from. It also creates the illusion that an object that looks three dimensional, is contained within the pyramid shaped container you created. That's all there is to it.
Still...impressive!

Makes me wonder....could a rainbow be called a real hologram?


Saturday, May 7, 2016

More concepts of Apache Storm you need to know

As my mentor tells me -  
"To be able to program in Storm, you first need to think in Storm".


That's very true. Storm is designed in a way that allows you to structure your application design in a way that allows it to scale. But before you begin that journey, there are a few concepts you need to know, which aren't explained in the official documentation (or sometimes not explained clearly enough).




The Constructor concept
[Thanks to Matthias J. Sax on the Storm mailing list for explaining this]

When you create a Spout or a Bolt, Storm calls that class constructor only once. After that, the class gets serialized and from then on, whenever Storm needs to create a new instance of the Spout or Bolt, Storm uses the serialized instance to do so.

But for every new instance Storm creates, Storm will call the open() function for Spouts and the prepare() function for Bolts.
So open() and prepare() are like constructors.



Making your Spouts or Bolts do things at specified intervals

Use tick tuples.



The reason you should exit the nextTuple() function ASAP
[Thanks to Spico Florin on the Storm mailing list for explaining this]

I had created a while loop in nextTuple() of my Spout to emit multiple tuples, but I didn't receive any ack's at all.
Turns out that nextTuple() and the ack() method are called in the same thread by the framework. So if you have  heavy computation in the next tuple, your ack() method will never be called and the buffers that are responsible for receiving the ack messages will not be emptied. The nextTuple() acts as producer for the these buffers while ack() as a consumer.

So remember to emit a tuple and exit the nextTuple() function immediately.
For those who don't know about the ack() method, you can override it (and the fail() method) in your Spout like this:

    @Override
    public void ack(Object msgId) {
        System.out.println("Ack received for Spout"+msgId);
        tupleAck = true;
    }   
   
    @Override
    public void fail(Object msgId) {
        System.out.println("Failed tuple msgID: "+msgId);
        //---tuple replay logic should be here
    }


This helps you know whether your tuple was received and processed by the Bolt or whether the transmission or processing failed.



More creative topology structures
[Thanks to Matthias J. Sax on the Storm mailing list for the idea]

When learning Storm, we come across simple examples and are conditioned into thinking that way.



It doesn't have to be that way though. When you use streams and the various techniques of grouping, you'll find a whole new world open up.

Example:
If you want to create a topology where the spout notifies the end bolts that it has no more input, you can do it this way:

Just specify a separate stream in the spout and emit the notification tuples. When creating the topology, specify an allGrouping for the receiving bolts. What happens is that no matter how many instances of the bolt are created, the spout will send the tuple to all of them. It's like a broadcast.

So the topology would be created like this:

TopologyBuilder b = new TopologyBuilder();

b.setSpout("SpoutA_name", new SpoutA(), 1)
.setNumTasks(1);      
     
b.setBolt("boltA_name", new boltA(), 2)
.shuffleGrouping("SpoutA_name");

b.setBolt("boltB_name", new boltB(), 5)
.fieldsGrouping("boltA_name", new Fields(someID))
.allGrouping("SpoutA_name", "StreamName");



This is how the spout sends a stream to the bolts at the end:

@Override
public void declareOutputFields(OutputFieldsDeclarer ofd) {
    ofd.declare(new Fields("theNormalTuple"));
    ofd.declareStream("StreamName", new Fields("someID"));//this specifies the stream that reaches the end bolt B
}
   
@Override
public void nextTuple() {       

    if (nothingMoreToProcess) {
        collector.emit("StreamName", new Values(someID));//this emits the stream to bolts B
    }       
    else {
        collector.emit(new Values(someTuples), someTuples);//this emits to bolts A
    }
}     



...and this is how the bolt receives it:

@Override
public void execute(Tuple tuple) {
    
    if (("StreamName").equals(tuple.getSourceStreamId())) {//recognizes the stream from the spout
        //do whatever you do when there is nothing more to process
    }
    else {  
        //do your usual processing
    }
}


Don't stop here. There are plenty more ways to think of emitting streams and directing the flow. Remember that Storm is designed to be a Directed Acyclic Graph. You can design your topology as such.




In the code, which are the tasks and executors?

There's a confusion about tasks and executors, because in this case:

builder.setBolt("bolt1", new BoltA(), 3)
                .setNumTasks(2)

Storm creates 3 executors and 2 tasks.

but

In this case (if you don't specify setNumTasks)

builder.setBolt("bolt1", new BoltA(), 2) 

Storm creates 2 executors and 2 tasks.

Remember that a task is an instantiation of a serialized instance of BoltA class (see the constructor concept at the top of this page). An executor is just a thread which processes a task class. If an executor has to process two task classes, then the executor will process the first one and only then process the second one.





Additional links:

If you are looking for a simple tutorial or code sample for Storm, they are here:

Sunday, May 1, 2016

Mass brainwashing

Do you know that diamonds are not a woman's best friend?
Well, allow me to introduce you to the diamond myth:

"...diamonds only came into popularity in the 20th century...But in 1870, a huge cache of diamonds was unearthed in South Africa...With a voluble increase in available diamonds, how could they be priced for their scarcity and rareness? Diamond mine financiers realized that they needed to protect their interests...And that’s how De Beers — which held a monopoly on diamond mining and pricing — was created...

In the 1930s...to promote diamond story lines, place engagement rings in films and get diamonds into the hands of burgeoning celebrities...in the 1940s, a series of lectures was promoted in high schools...

All the advertising, film and television placement and mass psychology worked. After 20 years on the campaign...the younger generation had successfully been implanted with the idea that diamonds were a de rigeur part of courtship. To this new generation a diamond ring is considered a necessity to engagements...

...De Beers sold the idea that a diamond was an expensive but necessary token of affection...Conversely, a woman who didn’t have an engagement ring –who wasn’t showered with diamonds throughout her relationship — was somehow “less loved” than her diamond-swathed counterparts....It’s a lie that started less than a 100 years ago, and it’s a lie the diamond industry has been banking on ever since."

I found this report very intriguing. Haven't we all been brainwashed in similar ways? Made to feel that doing certain things were an absolute necessity to be accepted in society?

Even before mass advertising took over the planet, there were Shaman's, Rainmakers, Soothsayers, Witch doctors, Oracles, Astrologers. After that came the need to drink Complan to gain height. To drink Bournvita to gain capability. To remember only Maggi for a quick snack. To drink only Coca Cola / beer when thirsty. To drink Boost / Glucon D for energy.
As though there were no other cheaper, healthier and much much better alternatives available!


Religion

Take religion for example. What exactly does your religion actually want you to do?

To help people. To respect and be kind to others. To live peacefully. To recognize and appreciate that a beautiful universe may have been created by a much wiser, powerful being.

Is this what religious people actually do? What I see religious people do, is robotically following a set of pre-defined rituals, organizing cash-flow, killing in the name of religion, spreading fear, superstition, greed, ignorance, anger, groupism, hatred and selfishness.

It's very surprising that even grown adults don't realize that they would have been following the rituals of some other religion if they were born into that family. It isn't surprising though, that most people think they are being religious by simply following rituals. They forget what their religion actually wants them to do. Such is the power of mass brainwashing, fear and hysteria.


Social Customs

Same applies to other social customs. Going out for a movie and dinner is somehow considered cool. Having a party at a club, going out for a company-sponsored lunch, volunteering grandly for one hour...

These may be enjoyable sometimes. These may be enjoyable to some people. But do you really enjoy it?
Would you find it more enjoyable to read an interesting book? To go on a long drive? To explore places?


A comic by Zen Pencils captures this nicely: http://zenpencils.com/comic/nerdist/


We live in a society that sees someone doing something they love, see the happiness on their faces and somehow believe that if we do the same thing, we would be happy too. What would really make you happy is the removal of the pressure to imitate others. To realize your interests and to do what makes you happy, no matter what the brainwashed masses think of it. It is in that moment that we find true peace and joy.
- Navin Ipe


That's also when you realize the true meaning of "Be yourself".

Of course, it's also important to keep in mind the laws of the land, the practicalities of finance, your dependents and responsibilities of life.

Think for yourself, people. You own your mind, you have the right to know the truth and to live life the way you wish.

Wednesday, April 27, 2016

Aha!

Continued from the previous Aha!


Abracadabra traffic!
Share with this link




Continued in the next Aha!
 

Monday, April 18, 2016

A simple Apache Storm tutorial [Part 2: Implementing failsafes]


Continued from part1


If you really want to understand what the Value class is, what the Tuple class is etc., the best place to look, is not the tutorials on the internet. Look at the actual Storm source code.
It's available here: https://github.com/apache/storm
Go into the "storm-core/src/jvm/org/apache/storm" folder and have a look at those Java files. The code is very simple to understand and I promise you, it will be an enlightening experience.

Now, onto the ack and fail aspects of Storm.

Given below, is the exact same program as Part 1 of this tutorial. The added sections and sections that need your attention are highlighted.


BasicStorm.java:

package com.sdint.basicstorm;

import org.apache.storm.Config;

import java.util.concurrent.TimeUnit;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

public class BasicStorm {

    public static void main(String[] cmdArgs) {
       
        Config config = new Config();
        //config.put(Config.TOPOLOGY_DEBUG, false);
        config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
        config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);//alters the default 30 second of tuple timeout to 10 second
       
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("myDataSpout", new DataSpout());
       
        builder.setBolt("proBolt", new ProcessingBolt()).shuffleGrouping("myDataSpout");
       
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("BasicStorm", config, builder.createTopology());
       
        System.out.println("\n\n\nTopology submitted\n\n\n");
        pause(120);//pause for 120 seconds during which the emitting of tuples will happen
       
        //localCluster.killTopology("BasicStorm");
        localCluster.shutdown();
    }//main


    public static void pause(int timeToPause_InSeconds) {
        try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);} 
        catch (InterruptedException e) {System.out.println(e.getCause());}
    }
 }//class


DataSpout.java:

package com.sdint.basicstorm;

import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataSpout extends BaseRichSpout {
    private TopologyContext context;
    private SpoutOutputCollector collector;
   
    //---logger
    private final Logger logger = LoggerFactory.getLogger(DataSpout.class);
   
    private boolean tupleAck = true;
    private Long oldTupleValue;
   
   
    @Override
    public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
        this.context = tc;
        this.collector = soc;
       
        System.out.println("\n\n\nopen of DataSpout\n\n\n");      
    }
   
    public DataSpout() {
        System.out.println("\n\n\nDataSpout ctor called\n\n\n");
    }//ctor

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
        System.out.println("\n\n\ndeclareoutputfields of DataSpout\n\n\n");
       
        ofd.declare(new Fields("line"));
    }

    @Override
    public void nextTuple() {
        System.out.println("\n\n\nnexttuple of DataSpout\n\n\n");
       
        Long newTupleValue;
        if (tupleAck) {
            newTupleValue = System.currentTimeMillis() % 1000;
            oldTupleValue = newTupleValue;
        }
        else {newTupleValue = oldTupleValue;}

       
        this.collector.emit(new Values(newTupleValue), newTupleValue);
        System.out.println("\n\n\nEmitting "+newTupleValue+"\n\n\n");
        pause(1);
    }
   
    @Override
    public void ack(Object msgId) {
        System.out.println("\n\n\nAck received for DataSpout"+msgId+"\n\n\n");
        tupleAck = true;
    }   
   
    @Override
    public void fail(Object msgId) {
        System.out.println("\n\n\nFailed tuple msgID: "+msgId+"\n\n\n");
        //replay logic should be here
        tupleAck = false;
    }

 

    public void pause(int timeToPause_InSeconds) {
        try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);} 
        catch (InterruptedException e) {System.out.println(e.getCause());}
    }
    
}//class



ProcessingBolt.java:

package com.sdint.basicstorm;

import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

public class ProcessingBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void declareOutputFields(OutputFieldsDeclarer ofd) {
        System.out.println("\n\n\ndeclareOutputFields of ProcessingBolt called\n\n\n");
    }

    @Override
    public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
        System.out.println("\n\n\nprepare of ProcessingBolt called\n\n\n");
        collector = oc;
    }

    @Override
    public void execute(Tuple tuple) {
        System.out.println("\n\n\nTuple received in ProcessingBolt:"+tuple+" \n\n\n");
        collector.ack(tuple);
    }

   
}



Notice that this time when you run the program, the ack function in the Spout will get called whenever the Bolt executes the collector.ack(tuple); statement.

But suppose you comment out collector.ack(tuple);, then after a certain time period (normally 30 seconds, but in our program we made it 10 seconds), the fail function will get called.

This is how the Spout (and we) know whether a tuple has been received by the Bolt and acknowledged or not. The above program basically uses the System time as a Tuple and in case the Bolt does not acknowledge that it has received the Tuple, then the Spout sends the same old Tuple to the Bolt again.




And before getting into hardcore Storm programming, there is this important thing:

Apache Storm concepts you really need to know.



A simple Apache Storm tutorial [Part1]

Apache Storm is actually well documented. Problem is, you won't understand any of it until you actually try out some code (even if it's in the form of a nice explanation by Chandan Prakash), and there's a dearth of simple code available on the internet. NRecursions comes to your rescue :-)

To run this program, you can either do it with Gradle (which I've used mainly so that the jar dependency management would automatically be handled by Gradle) or you could simply create a normal Java project and manually add the necessary jar's. The jar's you'll need are:
  • asm-5.0.3.jar
  • bson4jackson-2.7.0.jar
  • clojure-1.7.0.jar
  • disruptor-3.3.2.jar
  • kryo-3.0.3.jar
  • log4j-api-2.1.jar
  • log4j-core-2.1.jar
  • log4j-over-slf4j-1.6.6.jar
  • log4j-slf4j-impl-2.1.jar
  • logback-classic-1.1.3.jar
  • logback-core-1.1.3.jar
  • logback-core-1.1.3.jar
  • minlog-1.3.0.jar
  • objeneiss-2.1.jar
  • reflectasm-1.10.1.jar
  • servlet-api-2.5.jar
  • slf4j-api-1.7.12.jar
  • storm-core-1.0.0.jar

Create a Gradle project named "BasicStorm" and create a source package named "com.sdint.basicstorm".

Within that package, create BasicStorm.java

package com.sdint.basicstorm;

import org.apache.storm.Config;

import java.util.concurrent.TimeUnit;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;

public class BasicStorm {

 public static void main(String[] cmdArgs) {

 Config config = new Config();
 //config.put(Config.TOPOLOGY_DEBUG, false);
 config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
 config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);//alters the default 30 second of tuple timeout to 10 second

 TopologyBuilder builder = new TopologyBuilder();
 builder.setSpout("myDataSpout", new DataSpout());

 builder.setBolt("proBolt", new ProcessingBolt()).shuffleGrouping("myDataSpout");

 LocalCluster localCluster = new LocalCluster();
 localCluster.submitTopology("BasicStorm", config, builder.createTopology());

 System.out.println("\n\n\nTopology submitted\n\n\n");
 pause(120);//pause for 120 seconds during which the emitting of tuples will happen

 //localCluster.killTopology("BasicStorm");
 localCluster.shutdown();
 }//main


 public static void pause(int timeToPause_InSeconds) {
    try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);} 
    catch (InterruptedException e) {System.out.println(e.getCause());}
}

}//class



and DataSpout.java

package com.sdint.basicstorm;

import java.util.Map;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataSpout extends BaseRichSpout {
 private TopologyContext context;
 private SpoutOutputCollector collector;

 //---logger
 private final Logger logger = LoggerFactory.getLogger(DataSpout.class);

 @Override
 public void open(Map map, TopologyContext tc, SpoutOutputCollector soc) {
 this.context = tc;
 this.collector = soc;

 System.out.println("\n\n\nopen of DataSpout\n\n\n");
 }

 public DataSpout() {
 System.out.println("\n\n\nDataSpout constructor called\n\n\n");
 }//ctor

 @Override
 public void declareOutputFields(OutputFieldsDeclarer ofd) {
 System.out.println("\n\n\ndeclareoutputfields of DataSpout\n\n\n");

 ofd.declare(new Fields("line"));
 }

 @Override
 public void nextTuple() {
 System.out.println("\n\n\nnexttuple of DataSpout\n\n\n");

 Long newTupleValue =System.currentTimeMillis() % 1000;

 this.collector.emit(new Values(newTupleValue), newTupleValue);
 System.out.println("\n\n\nEmitting "+newTupleValue+"\n\n\n");
 pause(1);
 }


 public void pause(int timeToPause_InSeconds) {
    try {TimeUnit.SECONDS.sleep(timeToPause_InSeconds);} 
    catch (InterruptedException e) {System.out.println(e.getCause());}}
}//class




and ProcessingBolt.java

package com.sdint.basicstorm;

import java.util.Map;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;

public class ProcessingBolt extends BaseRichBolt {
 private OutputCollector collector;

 @Override
 public void declareOutputFields(OutputFieldsDeclarer ofd) {
 System.out.println("\n\n\ndeclareOutputFields of ProcessingBolt called\n\n\n");
 }

 @Override
 public void prepare(Map map, TopologyContext tc, OutputCollector oc) {
 System.out.println("\n\n\nprepare of ProcessingBolt called\n\n\n");
 collector = oc;
 }

 @Override
 public void execute(Tuple tuple) {
 System.out.println("\n\n\nTuple received in ProcessingBolt:"+tuple+" \n\n\n");
 collector.ack(tuple);
 }

}




Your build.gradle file would look like this (if you choose to create a Gradle project. You won't need this if you're creating a simple Java project):

apply plugin: 'java'
apply plugin: 'eclipse'

defaultTasks 'jar'

jar {
 from {
        (configurations.runtime).collect {
            it.isDirectory() ? it : zipTree(it)
        }
    }   
    manifest {
        attributes 'Main-Class': 'com.sdint.basicstorm.BasicStorm'
    }
}

sourceCompatibility = '1.8'
[compileJava, compileTestJava]*.options*.encoding = 'UTF-8'

if (!hasProperty('mainClass')) {
    ext.mainClass = 'com.sdint.basicstorm.BasicStorm'
}

repositories {
    mavenCentral()
}

dependencies { 
    //---apache storm
    compile 'org.apache.storm:storm-core:1.0.0'  //compile 'org.apache.storm:storm-core:0.10.0'
    //---logging
    compile "org.slf4j:slf4j-api:1.7.12"
    compile 'ch.qos.logback:logback-classic:1.1.3'
    compile 'ch.qos.logback:logback-core:1.1.3'   
       
    testCompile group: 'junit', name: 'junit', version: '4.10'

}



Hope you've already read a little about how the Spout's and Bolt's work in Storm.

This is what happens in BasicStorm:

  • The moment a Spout or Bolt is instantiated, its declareOutputFields function gets called. For our simple program we don't need to know what it is, so let's ignore it for now.
  • When the topology is submitted to Storm, the open function gets called for Spouts, and this function gets called only once for a Spout instance.
  • For Bolts, the equivalent of open is the prepare function which gets called. You can use open and prepare to do initializations, declarations etc. for your program.
  • After submitting the topology, we pause for a while in main() (pause(120);) so that Storm would get time to run the topology. This is the time during which Storm calls nextTuple() of the Spout, and when the Spout emits a Tuple, the Tuple is sent to the Bolt (because in main() we configured the Bolt to receive Tuples from the Spout. See the line builder.setBolt("proBolt", new ProcessingBolt()).shuffleGrouping("myDataSpout");). 
  • When the Bolt receives the value, the execute() function of the Bolt is called.
  • BasicStorm is designed for a simple task. DataSpout emits a Long value (it's a Tuple), ProcessingBolt receives it and ProcessingBolt acknowledges (the collector.ack(tuple); line) that it has received the Long value and that the data processing for the tuple is complete.
  • When DataSpout receives the acknowledgement, it calls nextTuple() again and another tuple gets emitted.
  • This process keeps going on for the 120 seconds we have paused the main() thread for. After that, the topology shuts down.

Try tweaking the values here and there to find out how the program works. Try substuting some other value in place of the Long for the Tuple. Try substituting it with a class object. Remember that if you substitute it like that, you'll have to extend that class with Serializable.

One more thing to try out:
Try commenting out collector.ack(tuple); in ProcessingBolt.java. You'll basically not be telling the Spout that the tuple has been received by the Bolt. So after some time, the Spout will emit the tuple again. The interval of time the Spout waits for an acknowledgement (ack) is normally 30 seconds, but if you scroll up, you'll see that in main() we had set the time to 10 seconds (config.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 10);), so the Spout will wait just 10s before emitting the tuple again.



Continued in Part 2