Converting flat data to complex json inPySpark

 

We will try to convert a table data with repeating rows for an employee to nested json using spark

You can created a csv file like the below.

EmployeeID,Name,Color
1,Guru,Red
1,Guru,Green
2,Gova,Red
2,Giva,Yellow

You can import the data to databricks file system, I have imported the same and wrote the below code to validate if its there.


spark.read.text("/FileStore/tables/EmpData.csv").show()

below is the output 

+--------------------+ | value| +--------------------+ |EmployeeID,Name,C...| | 1,Guru,Red| | 1,Guru,Green| | 2,Gova,Red| | 2,Giva,Yellow| +--------------------+

Code to read csv and create dataframe

Pschema = "EmployeeID INTEGER, EmpName string, Color string"
srcDF = spark.read\
        .option("header","true")\
        .schema(Pschema)\
        .csv("/FileStore/tables/EmpData.csv")

Code to store the data into nested json

from pyspark.sql.functions import * 

df = srcDF.groupBy("EmployeeID","EmpName").agg(collect_list("Color").alias("ColorFile"))

df.write\
  .format("json")\
  .mode("overwrite")\
  .save("/FileStore/tables/EmpData.json")

Code to read json data and convert to flat dataframe

from pyspark.sql import functions as f

df = spark.read.json("/FileStore/tables/EmpData.json")
df.show()
df2 = df.select("EmployeeID","EmpName",f.explode(col("ColorFile")).alias("Color"))
df2.show()

Types of Analytics: descriptive, predictive, prescriptive analytics

TYPES OF ANALYTICS

The process of discovering meaningful pattern or information from a set of data is called analytics. Another question we always hear is, what is the difference between data and information ? - Data can termed as the set of actions stored in a system like transactions and information is the inferences taken out from data with calculation e.g. quarterly sales of Maruti Suzuki car for 2019.

So the analytics can be for 4 types

  • Descriptive
  • Prescriptive
  • predictive
  • Inferential

Descriptive analytics

The descriptive analytics is vary simple process of data analysis like Sales By Location for the year 2019. It examines the historical data and gives you information like how was my sales in the year 2019 ? Which Locations had better sales ? Which products were sold and what was the amount.

Descriptive analytics is the interpretation of historical data to better understand changes that have occurred in a business.

Prescriptive Analytics

Prescriptive Analytics is the area of data analytics that focuses on finding the best course of action in a scenario given the available data. It emphasizes actionable insights instead of data monitoring like descriptive analytics. 
So basically here once we find that the business was not good in 2nd quarter of 2019, we start getting into details like which product, which days the issue was and we start doing root cause and course of correction to improve the situation.

Predictive Analytics

In predictive analytics we try to guess future by reading past data, for e.g. we have some credit card transactions in past and based on the past transaction we try to guess if we can see any fraudulent transaction in future or we can guess the future price of real estate by seeing the past behavior.

Inferential analytics

Inferential analysis are techniques that allow us to use these samples to make generalizations about the populations from which the samples were drawn. It is, therefore, important that the sample accurately represents the population. The most common example would be exit poll in India

SendGrid email setup


Create SendGrid Account

Azure customers can unlock 25,000 free emails each month. These 25,000 free monthly emails will give you access to advanced reporting and analytics and all APIs (Web, SMTP, Event, Parse and more). For information about additional services provided by SendGrid, visit the SendGrid Solutions page.

To sign up for a SendGrid account

1.  Sign in to the Azure portal.
2.  In the menu on the left, click Create a resource.

3. Click Add-ons and then SendGrid Email Delivery.

4. Complete the signup form and select Create.

5. Enter a Name to identify your SendGrid service in your Azure settings. Names must be between 1 and 100 characters in length and contain only alphanumeric characters, dashes, dots, and underscores. The name must be unique in your list of subscribed Azure Store Items.
6. Enter and confirm your Password.
7. Choose your Subscription.
8. Create a new Resource group or use an existing one.
9. In the Pricing tier section select the SendGrid plan you want to sign up for.

10.  Enter a Promotion Code if you have one.
11.  Enter your Contact Information.
12.  Review and accept the Legal terms.
13.  After confirming your purchase you will see a Deployment Succeeded pop-up and you will see your account listed in the All resources section.

After you have completed your purchase and clicked the Manage button to initiate the email verification process, you will receive an email from SendGrid asking you to verify your account. If you do not receive this email, or have problems verifying your account, please see this FAQ.

You can only send up to 100 emails/day until you have verified your account.
To modify your subscription plan or see the SendGrid contact settings, click the name of your SendGrid service to open the SendGrid Marketplace dashboard.

To send an email using SendGrid, you must supply your API Key.

To find your SendGrid API Key

1.  Click Manage.

2.  In your SendGrid dashboard, select Settings and then API Keys in the menu on the left.

3. Click the Create API Key.


4. At a minimum, provide the Name of this key and provide full access to Mail Send and select Save.

5. Your API will be displayed at this point one time. Please be sure to store it safely.
6. The above key needs to be placed in KeyVault so that App can load it (SecretName = EmailAPIKey).


Creating free databricks cluster

Databricks community cloud

You can use databricks community cloud to create free databricks cluster. The cluster which you create will have 1 node only. You can run spark,scala codes on the cluster. you mount storages and extrernal file system like azure datalake and azure storage.

Steps to create free cluster.

  1. navigate to - https://community.cloud.databricks.com/login.html
  2. Click on signup, you will be redirected to page https://databricks.com/try-databricks
  3. Click on the get started icon below the community edition https://databricks.com/signup/signup-community
  4. Fill in all the required details and continue using the cluster.
You can run your code and save your notebooks. you cluster automatically gets dropped after 120 minutes of inactivity. You can create and new cluster and attach the same with your notebook to run your code.

Spark SQL basics

Spark SQL

Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset API. When computing a result the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch back and forth between different APIs based on which provides the most natural way to express a given transformation.

Creating free databricks cluster

You can create free spark cluster on databricks community cloud. You can sign up there and start creating your cluster. Your cluster will automatically gets dropped after 120 minutes of inactivity. You can create a new cluster every-time you work. The cluster which you create on the community cloud will have only one node and can be useful for learning purpose only. (Create Cluster)

 Mapping Azure storage and query data

There are two ways of accessing data from azure storage.
  • Mount storage in databricks FS.
  • Use spark conf context to access storage.
Here we will try to access a csv data using spark conf context and data frame.

Configuring azure storage using PySpark

You can use the below code to configure azure storage.  (Steps to create azure storage account)


account = "vehicleparking" container = "salesdata" spark.conf.set( "fs.azure.account.key.vehicleparking.blob.core.windows.net", "") path = "wasbs://"+ container + "@"+ account +".blob.core.windows.net/Sales.csv"

Code to read csv from azure blob storage

ds = spark.read.csv(path,header="true",inferSchema="true"); ds.createOrReplaceTempView("SalesData") mydata = spark.sql("SELECT * FROM SalesData WHERE City ='Hyderabad'") mydata.show()

You overall code looks like the below 


Creating storage account in azure

How to create azure storage account ?

Prerequisite : You must have azure subscription. 
Below are the steps to create azure storage account.
  • Login to Azure Portal.
  • Click on +Create Resource icon.

  • On next page click on the "Storage account - blob, file, table, queue"
  • Select the subscription details and fill the account name, the account should be in small letters and start with alphabet and must be unique.
  • There are other options like (Hot/Cold) , (V2/V1) . you can fill them all according to your requirement.
  • Once you can filled all the details, click on the "review + create".

Creating containers in storage account

Navigate to blob services and click on Containers,  click on +Containers follow the steps.

Collecting accessing key.

Navigate to the storage account you created in earlier step. You can view all the basic details in the overview tab. Navigate to Settings section and click on the "Access Keys". On the window you can copy key1/key2  for future use.

Introduction to Apache Spark

Introduction to Apache Spark

Spark is a general distributed data processing engine built for speed, ease of use, and flexibility. The combination of these three properties is what makes Spark so popular and widely adopted in the industry.
In terms of flexibility, Spark offers a single unified data processing stack that can be used to solve multiple types of data processing workloads, including batch processing, interactive queries, iterative processing needed by machine learning algorithms, and real-time streaming processing to extract actionable insights at near real-time.
A big data ecosystem consists of many pieces of technology including a distributed storage engine called HDFS, a cluster management system to efficiently manage a cluster of machines, and different file formats to store a large amount of data efficiently in binary and columnar format. Spark integrates really well with the big data ecosystem. This is another reason why Spark adoption has been growing at a really fast pace.Another really cool thing about Spark is it is open source; therefore, anyone can download the source code to examine the code, to figure out how a certain feature was implemented, or to extend its functionalities. In some cases, it can dramatically help with reducing the time to debug problems.

Spark core concept and architecture

The spark core architecture includes.
  • Spark clusters
  • The resource management system 
  • Spark applications 
  • Spark drivers 
  • Spark executors

Spark Cluster and resource management system

Spark is essentially a distributed system that was designed to process large volume of data efficiently and quickly.  This distributed system is typically deployed onto a collection of machines, which is known as a Spark cluster. A cluster size can be as small as a few machines or as large as thousands of machines.
To efficiently and intelligently manage a collection of machines, companies rely on a resource management system such as Apache YARN or Apache Mesos. We can see the master slave features here in the design. The two main components in a typical resource management system are the cluster manager (master) and the worker (slave). The cluster manager knows where the workers are located, how much memory they have, and the number of CPU cores each one has. One of the main responsibilities of the cluster manager is to orchestrate the work by assigning it to each worker. Each worker offers resources (memory, CPU, etc.) to the cluster manager and performs the assigned work.

Spark Application

A Spark application consists of two parts. The first is the application data processing logic expressed using Spark APIs, and the other is the Spark driver. The application data processing logic can be as simple as a few lines of code to perform a few data processing operations or can be as complex as training a large machine learning model that requires many iterations and could run for many hours to complete. The Spark driveris the central coordinator of a Spark application, and it interacts with a cluster manager to figure out which machines to run the data processing logic on. For each one of those machines, the Spark driver requests that the cluster manager launch a process called the Spark executor. Another important job of the Spark driver is to manage and distribute Spark tasks onto each executor on behalf of the application. If the data processing logic requires the Spark driver to display the computed results to a user, then it will coordinate with each Spark executor to collect the computed result and merge them together. The entry point into a Spark application is through a class called SparkSession, which provides facilities for setting up configurations as well as APIs for expressing data processing logic.

Spark driver and executor

Each Spark executor is a JVM process and is exclusively allocated to a specific Spark application. This was a conscious design decision to avoid sharing a Spark executor between multiple Spark applications in order to isolate them from each other so one badly behaving Spark application wouldn’t affect other Spark applications. The lifetime of a Spark executor is the duration of a Spark application, which could run for a few minutes or for a few days. Since Spark applications are running in separate Spark executors, sharing data between them will require writing the data to an external storage system like HDFS.As depicted in above figure, Spark employs a master-slave architecture, where the Spark driver is the master and the Spark executor is the slave. Each of these components runs as an independent process on a Spark cluster. A Spark application consists of one and only one Spark driver and one or more Spark executors. Playing the slave role, each Spark executor does what it is told, which is to execute the data processing logic in the form of tasks. Each task is executed on a separate CPU core. This is how Spark can speed up the processing of a large amount of data by processing it in parallel. In addition to executing assigned tasks, each Spark executor has the responsibility of caching a portion of the data in memory and/or on disk when it is told to do so by the application logic.
At the time of launching a Spark application, you can request how many Spark executors an application needs and how much memory and the number of CPU cores each executor should have. Figuring out an appropriate number of Spark executors, the amount of memory, and the number of CPU requires some understanding of the amount of data that will be processed, the complexity of the data processing logic, and the desired duration by which a Spark application should complete the processing logic.

Apache spark ecosystem components

Apache Spark guarantee for quicker information handling and also simpler advancement is conceivable only because of Apache Spark Components. All of them settled the issues that happened while utilizing Hadoop MapReduce.  Now, let's talk about each Spark Ecosystem Component one by one -

1. Apache Spark Core


Spark Core is the basic general execution engine for the Spark platform that all other functionalities are based upon. It gives In-Memory registering and connected datasets in external storage frameworks.

2. Spark SQL

Spark SQL is a segment over Spark Core that presents another information abstraction called SchemaRDD, which offers help for syncing structured and unstructured information.

3. Spark Streaming

Spark Streaming use Spark Core's quick scheduling ability to perform Streaming Analytics. It ingests information in scaled-down clusters and performs RDD (Resilient Distributed Datasets) changes on those small-scale groups of information.

4. MLlib (Machine Learning Library)

MLlib is a Distributed Machine Learning structure above Spark in view of the distributed memory-based Spark architecture. It is, as indicated by benchmarks, done by the MLlib engineers against the Alternating Least Squares (ALS) executions. Spark MLlib is nine times as rapid as the Hadoop disk version of Apache Mahout (before Mahout picked up a Spark interface).

5. GraphX

GraphX is a distributed Graph-Processing framework of Spark. It gives an API for communicating chart calculation that can display the client characterized diagrams by utilizing Pregel abstraction API. It likewise gives an optimized and improved runtime to this abstraction.

6. Spark R

Essentially, to utilize Apache Spark from R. It is R bundle that gives light-weight frontend. In addition, it enables information researchers to break down expansive datasets. Likewise permits running employments intuitively on them from the R shell. In spite of the fact that, the main thought behind SparkR was to investigate diverse methods to incorporate the ease of use of R with the adaptability of Spark.


T-SQL LEAD LAG and SUM function based query

  Query on T-SQL window clause Below is the sales table Order_Date Name Product SubCategory ...