Pushing realtime Sensors data into ASA & visualize into Near Real-Time (NRT) PowerBI dashboard– frontier of IoT


As per as the last demo on IoT foundation stuffs, we’ve seen how it’s possible to leverage the real-time data insights from social media datasets like Twitter with some keywords. In this demo, we are trying to pushing realtime sensors data from Windows Phone device to Azure Stream Analytics (through Service Bus EventHub channels) & after processing in ASA hub publishing out to realtime PowerBI dashboard or near real-time analytics(NRT) on PowerView for Excel by pushing out ASA events to Azure SQL database through Excel PowerQuery.

An overview of n-tier architecture of  ASA on IoT foundation is like this:

ASA-blog

 

While, IoT always enables customers to connect their own device on Azure cloud platform & bring out some real business value from it, whether it produces #BigData or #SmallData.

Another topic is pretty important is to get insights from Weblogs or telemetry data which can bring out good sentiment, click stream analytics values with machine learning.

Here goes a good high level discussion from IoT team.

Coming back to the demo, so, first implemented a sample app for generating Accelerometer 3D events (X, Y, Z) on Windows Phone & Windows Store devices(Universal app) & pushing the generated events as block blob to Azure Service Bus Event Hub.

Attached sample code snippet.

private async void ReadingChanged(object sender, AccelerometerReadingChangedEventArgs e)
{

await Dispatcher.RunAsync(CoreDispatcherPriority.Normal, () =>
{
AccelerometerReading reading = e.Reading;
ScenarioOutput_X.Text = String.Format(“{0,5:0.00}”, reading.AccelerationX);
ScenarioOutput_Y.Text = String.Format(“{0,5:0.00}”, reading.AccelerationY);
ScenarioOutput_Z.Text = String.Format(“{0,5:0.00}”, reading.AccelerationZ);
i++;

//Coordinate_X = String.Format(“{0,5:00.00}”,Coordinate_X + ScenarioOutput_X.Text);
//Coordinate_Y = String.Format(“{0,5:00.00}”, Coordinate_Y + ScenarioOutput_Y.Text);
//Coordinate_Z = String.Format(“{0,5:00.00}”, Coordinate_Z + ScenarioOutput_Z.Text);
dataDetails = i +”,”+ reading.AccelerationX + “,” + reading.AccelerationY + “,” + reading.AccelerationZ;

NewDataFile += Environment.NewLine + dataDetails;

});
CloudStorageAccount storageAccount = CloudStorageAccount.Parse(“DefaultEndpointsProtocol=https;AccountName=yourazurestorageaccountname;

AccountKey=yourazurestorageaccountkey”);

CloudBlobClient blobClient = storageAccount.CreateCloudBlobClient();

CloudBlobContainer container = blobClient.GetContainerReference(“accelerometer”);
await container.CreateIfNotExistsAsync();
//if (x == false)
//{
// await container.CreateAsync();
//}

CloudBlockBlob blockBlob = container.GetBlockBlobReference(newFileName);
// bool y = await blockBlob.ExistsAsync();
//if (!blockBlob.Equals(newFileName))
//{
container.GetBlockBlobReference(newFileName);
// await blockBlob.UploadTextAsync(dataDetails);

await blockBlob.UploadTextAsync(Headers + Environment.NewLine+ NewDataFile);
}

 

You can download the whole visual studio solution on Github.

BUILD-Kevin-thumbnail-IoT

Next challenge as usual is to send real sensor events to event hubs with accurate consumer key & publish millions of events to event hub at a time.

Here goes sample code snippet.

class Program
{
static string eventHubName = “youreventhubname”;
static string connectionString = GetServiceBusConnectionString();
static string data = string.Empty;
static void Main(string[] args)
{

string csv_file_path = string.Empty;
install();
//string csv_file_path = @””;
string[] filePath = Directory.GetFiles(@”Your CSV Sensor Data file directory”, “*.csv”);
int size = filePath.Length;
for (int i = 0; i < size; i++)
{
Console.WriteLine(filePath[i]);
csv_file_path = filePath[i];
}

DataTable csvData = GetDataTableFromCSVFile(csv_file_path);
Console.WriteLine(“Rows count:” + csvData.Rows.Count);
DataTable table = csvData;
foreach (DataRow row in table.Rows)
{
// Console.WriteLine(“—Row—“);
foreach (var item in row.ItemArray)
{

data = item.ToString();
Console.Write(data);

var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
//while (true)
//{

try
{
foreach (DataRow rows in table.Rows)
{
var info = new Accelerometer
{

ID = rows.ItemArray[0].ToString(),
Coordinate_X = rows.ItemArray[1].ToString(),
Coordinate_Y = rows.ItemArray[2].ToString(),
Coordinate_Z = rows.ItemArray[3].ToString()

};
var serializedString = JsonConvert.SerializeObject(info);
var message = data;
Console.WriteLine(“{0}> Sending events: {1}”, DateTime.Now.ToString(), serializedString.ToString());
eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(serializedString.ToString())));

}
}
catch (Exception ex)
{
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine(“{0} > Exception: {1}”, DateTime.Now.ToString(), ex.Message);
Console.ResetColor();
}
Task.Delay(200);
//}
}

}
// Console.ReadLine();

Console.WriteLine(“Press Ctrl-C to stop the sender process”);
Console.WriteLine(“Press Enter to start now”);
Console.ReadLine();

// SendingRandomMessages().Wait();

}

public static void install()
{
string url = @”https://…………blob.core.windows.net/accelerometer/AccelerometerSensorData.csv&#8221;;
WebClient wc = new WebClient();
wc.DownloadFileCompleted += new AsyncCompletedEventHandler(Completed);
wc.DownloadProgressChanged += new DownloadProgressChangedEventHandler(ProgressChanged);
// Console.WriteLine(“Download OnProgress……”);

ConsoleHelper.ProgressTitle = “Downloading”;
ConsoleHelper.ProgressTotal = 10;
for (int i = 0; i <= 10; i++)
{
ConsoleHelper.ProgressValue = i;
Thread.Sleep(500);
if (i >= 5)
{
ConsoleHelper.ProgressHasWarning = true;
}
if (i >= 8)
{
ConsoleHelper.ProgressHasError = true;
}
}
ConsoleHelper.ProgressTotal = 0;
try
{
wc.DownloadFile(new Uri(url), @”\ASA\Sensors\Accelerometer\AccelerometerSensorData.csv”);
}
catch (Exception ex)
{
while (ex != null)
{
Console.WriteLine(ex.Message);
ex = ex.InnerException;
}
}
}
public static void Completed(object sender, AsyncCompletedEventArgs e)
{
Console.WriteLine(“Download Completed!”);
}

public static void ProgressChanged(object sender, DownloadProgressChangedEventArgs e)
{
Console.WriteLine(“{0} Downloaded {1} of {2} bytes,{3} % Complete….”,
(string)e.UserState,
e.BytesReceived,
e.TotalBytesToReceive,
e.ProgressPercentage);
DrawProgressBar(0, 100, Console.WindowWidth, ‘1’);
}

private static void DrawProgressBar(int complete, int maxVal, int barSize, char ProgressCharacter)
{
Console.CursorVisible = false;
int left = Console.CursorLeft;
decimal perc = (decimal)complete / (decimal)maxVal;
int chars = (int)Math.Floor(perc / ((decimal)1 / (decimal)barSize));
string p1 = String.Empty, p2 = String.Empty;

for (int i = 0; i < chars; i++) p1 += ProgressCharacter;
for (int i = 0; i < barSize – chars; i++) p2 += ProgressCharacter;

Console.ForegroundColor = ConsoleColor.Green;
Console.Write(p1);
Console.ForegroundColor = ConsoleColor.DarkGreen;
Console.Write(p2);

Console.ResetColor();
Console.Write(“{0}%”, (perc * 100).ToString(“N2”));
Console.CursorLeft = left;
}
private static DataTable GetDataTableFromCSVFile(string csv_file_path)
{
DataTable csvData = new DataTable();
string data = string.Empty;
try
{
using (TextFieldParser csvReader = new TextFieldParser(csv_file_path))
{
csvReader.SetDelimiters(new string[] { “,” });
csvReader.HasFieldsEnclosedInQuotes = true;

//read column names
string[] colFields = csvReader.ReadFields();
foreach (string column in colFields)
{
DataColumn datecolumn = new DataColumn(column);
datecolumn.AllowDBNull = true;
csvData.Columns.Add(datecolumn);
}
while (!csvReader.EndOfData)
{
string[] fieldData = csvReader.ReadFields();

for (int i = 0; i < fieldData.Length; i++)
{
if (fieldData[i] == “”)
{
fieldData[i] = null;
}
}
csvData.Rows.Add(fieldData);

}
}
}
catch (Exception ex)
{

}
return csvData;
}

 

Now, built out ASA SQL query with specific window interval like in this demo, used ‘SlidingWindow(Second,no of interval)’ which generates computation on event hubs data based on the specific time interval mentioned in window.

ASAQuery

 

Next, start implement the processed output visualization on PowerBI preview portal by selecting ‘Output’ tab of ASA job. Once, you provide all the dataset name of output & start the ASA job, on PowerBI portal, would be able to see the specific dataset is created with a small yellow star icon beside.SensorsPowerBI

 

Here goes a step by step demonstration with video available on my Youtube channel.

Microsoft IoT Foundation: Realtime Tweets Streaming into Azure Stream Analytics with PowerBI & PowerBI Designer Preview


The Azure Stream Analytics(ASA) is one of the major component of Microsoft #IoT foundation which has got ‘PowerBI‘ as its output connector for visualization of realtime data streaming into Event hub to Stream Analytics hub, just one month back as ‘public preview’.

In this demo, we’re going to focus to end to end realtime Tweets analytics collecting through Java code using ‘Twitter4j’ library, then store it into OneDrive storage as .csv file as well as storing it into Azure storage as block blob. Then, sending realtime tweets streamed into Service Bus Event Hubs for processing , so, after creating the stream analytics job make sure that the input connector is properly selected as data stream for ‘event hub’, then process ASA SQL query with specific ‘HoppingWindow(second,3) & ‘SlidingWindow(Minute,10,5) with overlapping/non-overlapping window frame of data streaming.

Finally , select the output connector as PowerBI & authorize with your organisational account. Once, your ASA job starts running, you would be able to see the powerbi dataset which you have selected as powerbi output dataset name, start building the ASA connected PowerBI report & Dashboard.

First, a good amount of real tweets are collected based on the specific keywords like #IoT, #BigData, #Analytics, #Windows10, #Azure, #ASA, #HDI, #PowerBI, #AML, #ADF etc.

The sample tweets are looks like this

DateTime,TwitterUserName,ProfileLocation,MorePreciseLocation,Country,TweetID
06/24/2015 07:25:19,CodeNotFound,France,613714525431418880
06/24/2015 07:25:19,sinequa,Paris – NY- London – Frankfurt,613714525385289728
06/24/2015 07:25:20,RavenBayService,Calgary, Alberta,613714527302098944
06/24/2015 07:25:20,eleanorstenner,,613714530112274432
06/24/2015 07:25:21,ISDI_edu,,613714530758230016
06/24/2015 07:25:23,muthamiphilo,Kenya,613714541562740736
06/24/2015 07:25:23,tombee74,ÜT: 48.88773,2.23806,613714541931851776
06/24/2015 07:25:25,EricLibow,,613714547975790592

Now,  the data is sent to event hub for realtime processing & we’ve written the ASA-SQL like this.

CREATE TABLE input(
DateTime nvarchar(MAX),
TwitterUserName nvarchar(MAX),
ProfileLocation nvarchar(MAX),
MorePreciseLocation nvarchar(MAX),
Country nvarchar(MAX),
TweetID nvarchar(MAX))
SELECT input.DateTime, input.TwitterUserName,input.ProfileLocation,
input.MorePreciseLocation,input.Country,count(input.TweetID) as TweetCount
INTO output
FROM input Group By input.DateTime, input.TwitterUserName,input.ProfileLocation,input.MorePreciseLocation,
input.Country, SlidingWindow(second,10)

Output

Authorize

Next, start build up the PowerBI report on PowerBI preview portal. Once you build the Dashboard with report by pinning the graphs, it would like something like this.

IoTAnalytics

Analytics

You could be able to visualize the realtime update of data like #total tweet counts on the specific keywords, #total twitterusername tweeted , #total tweetloation etc.

WorldwideTweet

In another demo, we’ve used the PowerBI Designer preview tool by collecting processed tweets coming out from ASA hub to ‘Azure Blob Storage’ & then picking it into ‘PowerBI Designer Preview’.

PBIDesigner

In latest PBI , we’ve got support of combo stacked chart, which we’ve utilized to depict #average tweetcount of those specific keywords by location & timeframe for few minutes & seconds interval.

TweetComboStacked

Also, you could support for well end PowerQ&A features as well like ‘PowerBI for Office 365’ which has natural language processing (NLP) backed by Azure Machine Learning processing power enabled.

like if I throw a question on these realworld streaming dataset on PowerQ&A

show tweetcount where profilelocation is bayarea & London, Auckland, India, Bangalore,Paris as stacked column chart

PowerQ&A

After that, save the PBI designer file as .pbix & upload into www.powerbi.com , under get data->Local File section. It has got support for uploading PBI designer file as well as data source connector.

PBI

Upon uploading, built out the dashboard which has got facility of schedule refresh on preview portal itself. Right click on your PBI report on portal, select settings to open the schedule refresh page.

Settings

ScheduleRefresh

Here goes the realtime scheduled refresh dashboard of Twitter IoT Analytics on realtime tweets.

PBI-Portal

The same PBI dashboards can be visualized from the ‘PowerBI app for Windows Store or iOS’ . Here goes a demonstration.

WP_20150624_22_21_52_Pro

WP_20150624_22_22_20_Pro

Automated Provisioning of Azure Virtual Machines with PowerShell using Runbook


Recently, I have been adding a lot of energy towards the latest additions of Azure family, like Automation API, Scheduler, Machine Learning (ML) on HDInsight, StorSimple (checking it from today itself in management portal). With my utmost curiosity researched & noted down a few points to be taken care of while writing custom IaaS PowerShell scripts to provision fresh Azure VM image using traditional Azure cmdlets.

$adminPassword = '[YOUR-PASSWORD]'
$vmname = 'mytestvm1'
New-AzureQuickVM -Windows -ServiceName $cloudSvcName -Name $vmname -ImageName $image -Password $adminPassword


Most of us got familiar with these script while issue happens providing the $imagename , for Windows Server 2012 DataCenter , the command would be like this:

$cloudSvcName = '[Your Cloud Service Name]'
$vmname = '[Name of VM]'
$availabilityset = '[Name of Availability set]' (Optional)
$admin = '[Your Username]'
$password = '[Your Password]'
New-AzureQuickVM -Windows -ServiceName $cloudSvcName -AvailabilitySetName $availabilityset  -Name $vmname -ImageName "
bd507d3a70934695bc2128e3e5a255ba__RightImage-Windows-2012-x64-v5.8.8.12" -AdminUsername $admin –Password $password 

After provisioning , you would be able to see the default endpoints.
Remote EndpointEndPoint

The default configuration of VM would be (A1 1 core, 1.75 GB Memory) with Standard Tier in order to put multiple VMs on same load-balanced endpoint & ease autoscaling .

In next article, I would travel around PowerShell automation scripts using Run book utilizing Azure VM, Storage & Cloud services.