You are currently browsing the tag archive for the ‘spark’ tag.

Discardable Memory and Materialized Queries
http://hortonworks.com/blog/dmmq/

Materialized views with adapters for MongoDB, Apache Drill, and Spark

http://calcite.incubator.apache.org/

Nice article on probabilistic methods for aggregation

Probabilistic Data Structures for Web Analytics and Data Mining

Nice article on Sketches

Click to access sketches1.pdf

Apache Zookeeper is a distributed service providing application infrastructure for common distributed coordination tasks such as configuration management, distributed synchronization objects such as locks and barriers, leader election. Zookeeper can be used for cluster membership ops such as leader election and adding & removing of nodes. Zookeeper is used in the Hadoop ecosystem for high availability provisioning of YARN Resource Manager, HDFS NameNode fail over, HBase Master, and Spark Master.

The ZooKeeper service provides the abstraction of a set of data nodes -called znodes – organized into a hierarchical name space. The hierarchy of znodes in the namespace provide the objects used to keep state information. Access to nodes are provided my paths in the hierarchy.

Zookeeper uses the Zab distributed consensus algorithms this is similar to the classical Paxos algorithms. Zab and Paxos both follow a protocol where leader proposes values to the followers and then the leaders wait for acknowledgements from a quorum of followers before considering a proposal committed. Proposals include epoch numbers (ballot numbers in Paxos) that are unique version numbers.

In addition to configuration management, distributed locks and group membership algorithms; Zookeeper primitives can be used to implement;

Double barriers enable clients to synchronize the beginning and the end of a computation.
When enough processes, defined by the barrier threshold, have joined the barrier, processes start their computation and leave the barrier once they have finished.

Sometimes in distributed systems, it is not always clear a priori what the final system configuration will look like. For example, a client may want to start a master process and several worker processes, but the starting processes is done by a scheduler, so the client does not know ahead of time information such as addresses and ports that it can give the worker processes to connect to the master. We handle this scenario with ZooKeeper using a rendezvous znode.

Zookeeper Zab protocol messages are encapsulated in a QuorumPacket;
class QuorumPacket {
int type; // Request, Ack, Commit, Ping, etc
long zxid;
buffer data;
vector authinfo; // only used for requests
}

The basic API for manipulating nodes.

create(path, data, flags): Creates a znode with path name path, stores data[] in it, and returns the name of the new znode. flags enables a client to select the type of znode: regular, ephemeral, and set the sequential flag

delete(path, version): Deletes the znode path if that znode is at the expected version

getData(path, watch): Returns the data and meta-data, such as version information, associated with the znode

setData(path, data, version): Writes data[] to znode path if the version number is the current version of the znode

getChildren(path, watch): Returns the set of names of the children of a znode

sync(path): Waits for all updates pending at the start of the operation to propagate to the server that the client is connected to

Watches are used to monitor state changes. ZooKeeper watches are one-time triggers and due to the latency involved between getting a watch event and resetting of the watch, it’s possible that a client might lose changes done to a znode during this interval. In a distributed application in which a znode changes multiple times between the dispatch of an event and resetting the watch for events, developers must be careful to handle such situations in the application logic.

Settings are set in a number of ways;

  • $SPARK_HOME/conf/spark-defaults.conf – make a copy of the template and edit this if the file does not exist.
  • On the command line when submitting jobs or starting up the shell
  • Directly on the SparkContext object.

Shuffling
Spark stores intermediate data on disk from a shuffle operation as part of its “under-the-hood” optimization. When spark has to recompute a portion of a RDD graph, it may be able to truncate the lineage of a RDD graph if the RDD is already there as a side effect of an earlier shuffle. This can happen even if the RDD is not cached or explicitly persisted. Set the spark.shuffle.spill=false to turn this off if it is not needed.

Caching
The caching mechanism reserves a % of memory from the executor. This is specified in spark.storage.memoryFraction

Partitioning
Use more partitions as data size increases

Broadcast Variables
Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.

Memory Leaks
Closing over objects in lambdas can cause memory leaks. Check the size of the serialized Spark task to make sure there are no leaks.

Sometimes the logging output from spark-shell can get annoying, or your results can get lost in the content.

Use the instructions below to set the logging level

  • In the conf folder find log4j.properties.template
  • make a copy of this file and rename it to log4j.properties
  • Edit the file replacing INFO with WARN or ERROR
Follow Bruce B Campbell on WordPress.com