Azure Stream Analytics & Machine Learning Integration With RealTime Twitter Sentiment Analytics Dashboard on PowerBI


Recently, it has been introduced the integration of ASA & AML available as preview update & it’s possible to add AML web service URL & API key as ‘custom function‘ with ASA input. In this demo, realtime tweets are collected based on keywords like ‘#HappyHolidays2016‘, ‘#MerryChristmas‘, ‘#HappyNewYear2016‘ & those are directly stored on a .csv file saved on OneDrive. Here goes the solution architecture diagram of the POC.

SolutionArc

 

 

Now, add the Service Bus event hub endpoint as input to the ASA job, while deploy the ‘Twitter Predictive Sentiment Analytics Model‘  & click on ‘Open in Studio‘ to start deploy the model. Don’t forget to run the solution before deploying.

AML

 

Once the model is deployed, open the ‘Web Service‘ dashboard page to get the model URL & API key, click on default endpoint -> download the excel 2010 or earlier apps. Collect the URL & API key to apply it to ASA function credentials for AML deployment.

DeployedAML

Next, create an ASA job & add the event hub credentials where the real world tweets are getting pushed & click on ‘Functions‘ tab of ASA job to add the AML credentials. Provide model name, URL & API key of the model & Once, it’s added, click on Save.

ASA-Functions

 

Now, add the following ASA SQL to aggregate the realtime tweets sentiment scores coming out from predictive twitter sentiment model.

Query

 

Provide the output as Azure Blob storage, add a container name & serialization type as CSV & start the ASA job. Also, start importing data into PowerBI desktop from the ASA output Azure blob storage account.

Output

 

 

PowerBI desktop contains in-built power Query to start preparing the ASA output data & processing data types. Choose the AML model sentiment score datatype as decimal type & TweetTexts as Text(String) type.

PBI-AML

 

Start building the ‘Twitter Sentiment Analytics‘ dashboard powered by @AzureStreaming & Azure Machine Learning API with realworld tweet streaming, there’re some cool custom visuals are available on PowerBI.  I’ve used some visuals here like ‘wordcloud‘ chart which depicts some of the highly scored positive sentiment contained tweets with most specific keywords like ‘happynewyear2016‘, ‘MerryChristmas‘,’HappyHolidays‘ etc.

PBI-visuals

 

While, in the donut chart, the top 10 tweets with most positive sentiment counts are portrayed with the specific sentiment scores coming from AML predictive model experiment integrated with ASA jobs.

PBI-dashboard

~Wish you HappyHolidays 2016!

A lap around Microsoft Azure IoT Hub with Azure Stream Analytics & IoT Analytics Suite


Last month on #AzureConf 2015, the Azure IoT Suite has been announced to be available for purchase along with the GA release of Azure IoT Hub. The IoT Hub helps to control, monitor & connect thousands of devices to communicate via cloud & talk to each other using suitable protocols. You can connect to your Azure IoT Hub using the IoT Hub SDKs available in different languages like C, C#, Java, Ruby etc. Also, there’re monitoring devices available like device explorer or iothub-explorer. In this demo, Weather Data Analytics is demonstrated using Azure IoT Hub with Stream Analytics powered by Azure IoT Suite & visualized using Azure SQL database with PowerBI.

You can provision your own device into Azure IoT analytics Suite using device explorer or iothub-explorer tool & start bi-directional communication through device-cloud & cloud-device.

First, create your Azure IoT Hub from Azure Preview Portal  by selecting New-> Internet of Things -> Azure IoT Hub. Provide hub name, select pricing & scale tier[F1 – free(1/subscription, connect 10 devices, 3000 messages /day), [S1 – standard (50,000 messages/day) & S2- standard(1.5 M messages/day)] for device to cloud communication. Select IoT Hub units, device to cloud partitions, resource group, subscription & finally location of deployment(currently it’s available only in three locations- ‘East Asia’, ‘East US’, ‘North Europe’.

 

IoThubcreate

 

Once the hub is created, next switch to device explorer to start creating a device, for details about to create a device & register, refer to this Github page. After registering the device, move back to  ‘Data‘ tab of device explorer tool & click on ‘Monitor‘ button to start receive device-cloud events sent to Azure IoT Hub from device.

DeviceExplorer

 

The schema for the weather dataset looks like the following data & fresh data collected from various sensors & feed into Azure IoT Hub which can be viewed using Device Explorer tool.

DataSchema

 

In order to push data from weather data sensor device to Azure IoT hub, the following code snippet needs to be used. The full code-snipped is going to be available on my Github page.

 

using System;
using System.Text;
using System.Threading.Tasks;
using System.IO;
using System.Data;
using Newtonsoft.Json;
using Microsoft.VisualBasic;
using Microsoft.VisualBasic.FileIO;

namespace Microsoft.Azure.Devices.Client.Samples
{
class Program
{
private const string DeviceConnectionString = “Your device connection-string”;
private static int MESSAGE_COUNT = 5;
static string data = string.Empty;

static void Main(string[] args)
{
try
{
DeviceClient deviceClient = DeviceClient.CreateFromConnectionString(DeviceConnectionString);

if (deviceClient == null)
{
Console.WriteLine(“Failed to create DeviceClient!”);
}
else
{
SendEvent(deviceClient).Wait();
ReceiveCommands(deviceClient).Wait();
}

Console.WriteLine(“Exited!\n”);
}
catch (Exception ex)
{
Console.WriteLine(“Error in sample: {0}”, ex.Message);
}
}

static async Task SendEvent(DeviceClient deviceClient)
{
string[] filePath = Directory.GetFiles(@”\Weblog\”,”*.csv”);
string csv_file_path = string.Empty;
int size = filePath.Length;
for(int i=0; i< size; i++)
{
Console.WriteLine(filePath[i]);
csv_file_path = filePath[i];
}

DataTable csvData = GetDataTableFromCSVFile(csv_file_path);
Console.WriteLine(“Rows count:” + csvData.Rows.Count);
DataTable table = csvData;
foreach(DataRow row in table.Rows)
{
foreach(var item in row.ItemArray)
data = item.ToString();
Console.Write(data);

try
{
foreach(DataRow rows in table.Rows)
{
var info = new WeatherData
{
weatherDate = rows.ItemArray[0].ToString(),
weatherTime = rows.ItemArray[1].ToString(),
apperantTemperature = rows.ItemArray[2].ToString(),
cloudCover = rows.ItemArray[3].ToString(),
dewPoint = rows.ItemArray[4].ToString(),
humidity = rows.ItemArray[5].ToString(),
icon = rows.ItemArray[6].ToString(),
pressure = rows.ItemArray[7].ToString(),
temperature = rows.ItemArray[8].ToString(),
timeInterval = rows.ItemArray[9].ToString(),
visibility = rows.ItemArray[10].ToString(),
windBearing = rows.ItemArray[11].ToString(),
windSpeed = rows.ItemArray[12].ToString(),
latitude = rows.ItemArray[13].ToString(),
longitude = rows.ItemArray[14].ToString()
};

var serializedString = JsonConvert.SerializeObject(info);
var message = data;
Console.WriteLine(“{0}> Sending events: {1}”, DateTime.Now.ToString(), serializedString.ToString());
await deviceClient.SendEventAsync(new Message(Encoding.UTF8.GetBytes(serializedString.ToString())));
}
}

catch(Exception ex)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine(“{0} > Exception: {1}”, DateTime.Now.ToString(), ex.Message);
Console.ResetColor();
}
// Task.Delay(200);

}

Console.WriteLine(“Press Ctrl-C to stop the sender process”);
Console.WriteLine(“Press Enter to start now”);
Console.ReadLine();

//string dataBuffer;

//Console.WriteLine(“Device sending {0} messages to IoTHub…\n”, MESSAGE_COUNT);

//for (int count = 0; count < MESSAGE_COUNT; count++)
//{
// dataBuffer = Guid.NewGuid().ToString();
// Message eventMessage = new Message(Encoding.UTF8.GetBytes(dataBuffer));
// Console.WriteLine(“\t{0}> Sending message: {1}, Data: [{2}]”, DateTime.Now.ToLocalTime(), count, dataBuffer);

// await deviceClient.SendEventAsync(eventMessage);
//}
}

private static DataTable GetDataTableFromCSVFile(string csv_file_path)
{
DataTable csvData = new DataTable();
string data = string.Empty;
try
{
using (TextFieldParser csvReader = new TextFieldParser(csv_file_path))
{
csvReader.SetDelimiters(new string[] { “,” });
csvReader.HasFieldsEnclosedInQuotes = true;

//read column names
string[] colFields = csvReader.ReadFields();
foreach (string column in colFields)
{
DataColumn datecolumn = new DataColumn(column);
datecolumn.AllowDBNull = true;
csvData.Columns.Add(datecolumn);
}
while (!csvReader.EndOfData)
{
string[] fieldData = csvReader.ReadFields();

for (int i = 0; i < fieldData.Length; i++)
{
if (fieldData[i] == “”)
{
fieldData[i] = null;
}
}
csvData.Rows.Add(fieldData);

}
}
}
catch (Exception ex)
{
Console.WriteLine(“Exception” + ex.Message);
}
return csvData;
}

static async Task ReceiveCommands(DeviceClient deviceClient)
{
Console.WriteLine(“\nDevice waiting for commands from IoTHub…\n”);
Message receivedMessage;
string messageData;

while (true)
{
receivedMessage = await deviceClient.ReceiveAsync(TimeSpan.FromSeconds(1));

if (receivedMessage != null)
{
messageData = Encoding.ASCII.GetString(receivedMessage.GetBytes());
Console.WriteLine(“\t{0}> Received message: {1}”, DateTime.Now.ToLocalTime(), messageData);

await deviceClient.CompleteAsync(receivedMessage);
}
}
}
}
}

You could check output to events sending from device to cloud on console.

dataconsole

Next, start pushing the device data into Azure IoT Hub & monitor the events receiving process through device explorer. Now, start provisioning an Azure Stream Analytics Job on Azure portal. Provide ‘Azure IoT Hub‘ as an input to the job like as the followings.

SAJob

 

input

IoTHubinput

Now provide Azure Stream Analytics Query to connect incoming unstructured datasets from device to cloud to pass into Azure SQL database. So, first, provision a SQL database on Azure & connect to as output to Stream Analytics job.

create table input(
weatherDate nvarchar(max),
weatherTime datetime,
apperantTemperature nvarchar(max),
cloudCover nvarchar(max),
dewPoint nvarchar(max),
humidity nvarchar(max),
icon nvarchar(max),
pressure nvarchar(max),
temperature nvarchar(max),
timeInterval nvarchar(max),
visibility nvarchar(max),
windBearing nvarchar(max),
windSpeed nvarchar(max),
latitude nvarchar(max),
longitude nvarchar(max)
)
select input.weatherDate, input.weatherTime,input.apperantTemperature,input.cloudCover,
input.dewPoint, input.humidity,input.icon,input.pressure,count(input.temperature) as avgtemperature, input.timeInterval, input.visibility, input.windBearing,
input.windSpeed,input.latitude,input.longitude

into weathersql
from input
group by input.weatherDate, input.weatherTime, input.apperantTemperature,input.cloudCover,
input.dewPoint, input.humidity,input.icon, input.pressure,input.timeInterval,input.visibility, input.windBearing,
input.windSpeed,input.latitude,input.longitude, TumblingWindow(second,2)

ASA-sql

Specify the output of ‘WeatherIoT’ ASA job as ‘Azure SQL Database‘, alternatively, you can select any of the rest of the connectors like ‘Event Hub’, ‘DocumentDB’ etc.

SAOutput

 

Make sure that , to create the necessary database & table first on SQL before adding as output to ASA job. For this demo, I have created the ‘weatheriot‘ table on Azure SQL database. The t-sql query looks like this.

iotsql

 

Next, start the ASA job & receive the final Azure IoT hub(device to cloud) data processed to IoT hub ->ASA -> Azure SQL database pipeline. Once you receive data on your Azure SQL table. Start building the PowerBI ‘Weather IoT Data Analytics’ dashboard for visualization & to leverage the power of Azure IoT momentum.

SQLoutput

Connect to PowerBI connected through same account of Azure subscription where you provisioned the ASA job & start importing data from Azure SQL database. Create stunning reports using funnel, donut, global map charts with live data refresh.

WeatherData

For this demo, I’ve populated charts on average weather temperature, pressure, humidity, dew point forecasting analysis over specific areas based on latitude & longitude values, plotted & pinned into PowerBI ‘Weather Data Azure IoT Analytics’ dashboard.

WeatherData-analysis