Friday, December 26, 2014

Nodejs event loop and I/O threads

Everything runs parallel except your code

Nodejs has an asynchronous event loop running on a single thread.  It's waken up by OS for incoming requests and farms out I/O events to a thread pool in a non-blocking style and then responses are sent back to the event loop via callback.  Thus the event loop never blocks on I/O.

Below is a diagram Aaron Stannard used to illustrate the Nodejs processing model.


Nodejs' event loop is build using the default loop of libuv (uv_default_loop().)  libuv was an abstraction around libev or IOCP (Windows) providing an API based on libev to talk to the kernel event notification mechanisms.  Since node v0.9.0, libev was removed.  It's developed for use by Node.js as a multi-platform support library with a focus on async I/O.

From libuv's github page, it stated that it suports, 
1. event loop backed by epoll, kqueue, IOCP, event ports
2. async tcp & udp sockets
3. async dns resolution
4. async file & file system operations (using fs.openSync is very wrong!)
5. file system events
6. IPC with socket sharing
7. child processes
9. signal handling

As you can see all the I/O is ran on the worker threads and not block the main thread (the event loop.)  i.e all your code is run inside the event loop.  And since there's only one event loop, so everything runs parallel in nodejs except your code.

That's why it's important for the application to be I/O bounded rather than CPU bounded as a highly CPU bounded function (of course, we're not talking about implementing fibonacci as Ted did) will block the event loop, stopping it from accepting the next request, and yes, it'll be queued (by the OS i presume) and if it's long enough, it'll get timeout instead of performance degrading gracefully with load.  It works like cooperative multitasking that a particular request's operation will continue until it cedes control back to the event loop. 

And for CPU intensive tasks, we can

1. yield the process by using process.nextTick() which we have to break the process into smaller chunk so after processing one chunk, the next chunk is typically runs in the next loop around the event loop and typically runs before any other I/O event fire.  However it's still possible to stave the event loop by putting too many callbacks with process.nextTick(), we can use process.maxTickDepth to limit the number of process.nextTick() to be evaluated before other I/O event.

2. leverage node-webworker-threads library to offload works to threads if it's pure javascript and create a pool of child processes for it.

3. implement custom event and extension to libuv but we'll have to build the task in C++.

Now since almost everything is running on a single thread (even though I/O are offloaded to a thread pool, for most applications, that's just a small portion of CPU time) and nodejs will run on a single core no matter how many cores our server has.  One solution would be starting up multiple instance of nodejs but that would introduce a bit maintenance and each instance have to listen to a different port.  And nodejs provided a clustering solution which the master process would listen to the port and forward the request in a round-robin fashion to the child processes. 

Note that clustering is not the solution to processing CPU intensive tasks.  Each child process will have its own event loop and if that event loop is blocked, the master don't know that it's blocked and next request will still be queued. 

Also, if we are using Mesos with docker, we just have to define the number of CPU required for the application is 1 (or 2?) and we can run multiple instances of the application on the same host easily.

Another solution is to try out JXCore which is a multithreaded port of nodejs.  
It even claims it "performs even better with application requiring more CPU power"

By default, libuv has 4 I/O threads.  For most of the time, it's good enough.  If your application has slow I/O, all 4 I/O threads could be locked up.  We could set environment variable UV_THREADPOOL_SIZE to increase the number of IO threads if the slow IO is just the way it is.  However, if it's abnormal for that IO to be slow (due to heavy loading or some other failure), we should set timeout instead.

Note: our application use request a lot to call other services, however in the documentation, it appears it only have timeout for the response but not the connection timeout.

At last, we'll need a tool to monitor the event loop and i/o threads to better tune our application.  How do we know if the little thing we do between IO is blocking the event loop enough that request start queuing up and that the other CPU is idling?  or that all 4 of the io threads are blocked and even though request is accepted but nothing is being processed.  (note, our application has clustering and once we had a issue with turning on debug logging, probably the disk is too slow to write all those logs, all io threads are used up and the cluster will kill and restart the child process!)  Strongloop has a monitoring solution that could monitor the event loop, gc, cpu and memory usage.



references:
http://nikhilm.github.io/uvbook/index.html
https://github.com/libuv/libuv
Bert Belder - libuv - https://www.youtube.com/watch?v=nGn60vDSxQ4
http://howtonode.org/understanding-process-next-tick
http://nodejs.org/api/process.html#process_process_nexttick_callback
http://stackoverflow.com/questions/17956692/child-process-for-cpu-intensive-task
https://github.com/audreyt/node-webworker-threads
http://zef.me/4561/node-js-and-the-case-of-the-blocked-event-loop/
http://strongloop.com/strongblog/whats-new-in-node-js-v0-12-cluster-round-robin-load-balancing/
http://www.kennynet.co.uk/2014/07/07/nodejs-and-blocked-io/
http://stackoverflow.com/questions/15526546/confusion-about-node-js-internal-asynchronous-i-o-mechanism
https://github.com/request/request

Reactive Microservice Part 4

Best practices

Shared Test Environment

Developer can run CI jobs in the test environment and it’s very close to the prod environment. The stability of it is treated with the same respect as trunk which stays open and stable so developer can get a clear signal of their tests by testing their local changes again the shared test environment.

Canary deployment

Canary services can be rollout little by little and direct traffic towards it. If there’s any problem, we can roll back the service. Netflix Asgard is a deployment management which manages red/black pushes.

Monitor Everything

Service dependency visualization should be able to answer questions like, how many dependencies does my services have? What’s the call volume on my service? Are any dependency service running hot? What are the top N slowest “business transaction”? Distributed tracing can be used to build a graph of all the service dependency. Some APM (application performance monitoring) tools like Dyntrace would also be able to trace a request through multiple services. Dashboard metrics (quick scan of services health, median, 99%ile, 99.9%ile) are also essential, e.g. Netflix hystrix / turbine. And 500px is using datadog. Google has cloud trace and cloud debug (part of Google Cloud Monitoring). Some of the other metrix to measure incident response are MTTD (mean time to detection), MTTR (mean time to resolution) and the method of detection.

12 factor application

The twelve-factor app is a methodology for building software-as-a-service apps that:
  • Use declarative formats for setup automation, to minimize time and cost for new developers joining the project;
  • Have a clean contract with the underlying operating system, offering maximum portability between execution environments;
  • Are suitable for deployment on modern cloud platforms, obviating the need for servers and systems administration;
  • Minimize divergence between development and production, enabling continuous deployment for maximum agility;
  • And can scale up without significant changes to tooling, architecture, or development practices.
Below is the 12 factors:
  1. Codebase – one codebase tracked in revision control, many deploys
  2. Dependencies – Explicitly declare and isolate dependencies
  3. Config – Store config in the environment
  4. Backing Services – Treat backing services as attached resources
  5. Build, release, run – strictly separate build and run stages
  6. Processes – execute the app as one or more stateless processes
  7. Port binding – export services via port binding
  8. Concurrency – scale out via the process model
  9. Disposability – maximize robustness with fast startup and graceful shutdown
  10. Dev/prod parity – keep development, staging and production as similar as possible
  11. Logs – treat logs as event streams
  12. Admin processes – run admin/management tasks as one-off processes

Automation

Automate everything, from building, testing to deployment, rollback. Developer should be self-service, no need to ask for production team or building team to enable them to develop.

Mock services

Each service should have a mock service for the consumer service to do testing. It’s best to be provided by the team who develop the service but in reality, it often developed by consumer service team.

Service Chassis (@Kixeye)

Service chassis is a framework to develop microservice that has all the core integration included.

  • Configuration integration
  • Registration and discovery
  • Monitoring and metrics
  • Load-balancing for downstream services
  • Failure management for downstream services
  • Dev: service template in maven
  • Deployment: build pipeline through puppet -> packer -> ami
  • NetflixOSS: asgard, hystrix, ribbon + websockets Janus, eureka

Reactive Microservice Part 3

Resilient

The system stays responsive in the face of failure. This applies not only to highly-available, mission critical systems — any system that is not resilient will be unresponsive after a failure. Resilience is achieved by replication, containment, isolation and delegation. Failures are contained within each component, isolating components from each other and thereby ensuring that parts of the system can fail and recover without compromising the system as a whole. Recovery of each component is delegated to another (external) component and high-availability is ensured by replication where necessary. The client of a component is not burdened with handling its failures.
Since each service has its own single responsibility and is independent. Failure is contained and isolated in one service. If one of the service is down, it won’t affect the rest of the site.
Failure is treated as a first class citizen.
  • Redundancy for machine/ cluster/ data center failures
  • Load-balancing and flow control for service invocations
  • Client protects itself with standard failure management patterns: timeouts, retries, circuit breakers.
Resiliency has to be tested too. Resiliency test or game day should be execute on the production system, making sure the servers response well under failure.

Api gateway (netflix)/ api facades (500px)

Api gateway/facades owned by web team which basically a proxy to the various backend systems. Since its own by the web team, there’s no need idle time to wait for the service team to build the service. Also, it functions as an abstract concurrency, which can pull data from various services and flatten them.

Isolate/access

Persistence between services should not be shared. This break the independency in both development and deployment of a service. However, this is one of the aspect of microservice that practitioners most struggled with, esp those with existing monolithic server which share the same database.

Loadbalancer

Using smart client loadbalancer. Netflix has opensourced Ribbon for this purpose. It can avoid the whole zone if it’s go bad.

Resiliency/Availability

Microservice does not automatically mean better availability. If a service failed, the consumer service can go into a loop and soon all available thread will be consumed. Employ circuit breaker, retries, bulk heading and fallbacks. Hystrix from Netflix provides supports for these patterns.

Test service for resiliency

Test for resiliency as failure in production would happens. Need to test for latency/error (Simian Army), dependency service unavailability and network errors. The system should be optimized for responding to failures.

Rest.li + deco

Rest.li is the RESTful services framework from LinkedIn which also give type-safe bindings, async, non-blocking I/O. It also provides dynamic service discovery and client side load balancing using zookeeper.
All domain model in LinkedIn is normalized, the link between two domain model is by using URN, a fully quantified foreign key (e.g. urn:li:member:123). Deco is a URN resolution library. Services can query Deco by what data it needs (not how) and Deco can look into the URN and fetch the data for the service.

Dynamic load balancer

Mailgun implemented a dynamic loadbalancer which basically includes service registration/discovery, load balancing, circuit breaker (retries) and canary deployment into one. New service can simply start and will register to the load balancer and other services can start consuming it. Multiple version of services can be run at the same time, thus enabling canary deployment. Without disrupting the existing services, we can start new version of the service and then start the consumer with will use the new version, so we can retire old service gracefully.


Elastic

The system stays responsive under varying workload. Reactive Systems can react to changes in the input rate by increasing or decreasing the resources allocated to service these inputs. This implies designs that have no contention points or central bottlenecks, resulting in the ability to shard or replicate components and distribute inputs among them. Reactive Systems support predictive, as well as Reactive, scaling algorithms by providing relevant live performance measures. They achieve elasticity in a cost-effective way on commodity hardware and software platforms.
Goal of an elastic system is to:
  • Scale up and down service instances according to load
  • Gracefully handle spiky workloads
  • Predictive and reactive scaling

Resource contention

It happens when there is a conflict over access to a shared resource.
  • Message-driven/async. Service should only hold on to a resource when it’s using it. The traditional BIO will block the thread when it’s waiting for some I/O operations, so the resource is not release and can cause contention.
  • Reactive all the way down. However, in some case it’s not possible. The best example is the use of JDBC. There’s no async driver (except for an experimental driver for Oracle and mysql.) This is also one of the reason moving to NoSQL. One possible way to deal with that is to use Actor model.
  • Beside idle threads, shared/distributed state also constitute contention point. It should be moved to the edge, like the client or external datastore, to reduce contention.

Fan out

One of the challenges of implementing microservice is fan out, both on the UI and among services. Fan out on the UI can be resolved by using API gateway which push the fan out problem to the server (with lower network latency) by returning a materialized view instead. Server caching can be used to reduce the amount of load to the services and instead of caching a service, we can cache the composite/materialized view.

Bottlenecks/hotspots

Services like user service might be required by multiple services in the same flow. Those services become the hotspot of the system. To reduce the dependency, we can pass data through header instead of having all individual services depends on it.

Message driven

Reactive Systems rely on asynchronous message-passing to establish a boundary between components that ensures loose coupling, isolation, location transparency, and provides the means to delegate errors as messages. Employing explicit message-passing enables load management, elasticity, and flow control by shaping and monitoring the message queues in the system and applying back-pressure when necessary. Location transparent messaging as a means of communication makes it possible for the management of failure to work with the same constructs and semantics across a cluster or within a single host. Non-blocking communication allows recipients to only consume resources while active, leading to less system overhead.
Message-driven is
  • Asynchronous message-passing over synchronous request-response
  • Often custom protocol over TCP/UDP or WebSockets over HTTP

Backpressure

An example of backpressure is user uploading file. Traditional, the web server will save it to a local file, and then upload to the storage (express is doing this.) With streaming, the web server can pipe the file to the storage while the user is uploading it. But what If the user upload faster than the server can save to the storage, backpressure is needed to control the flow. Stream can be throttled the upload from the user, so it’ll be the same speed to store the file. So backpressure is
  • Stream-oriented
  • Sensible behavior on resource contention
  • Avoids unnecessary buffering
Back to Rx. There’s 2 kinds of observable, hot & cold.

Hot observable

Typically we’ll use temporal operator – sample (take a sample of the stream in a time window), throttleFirst (take the first value throttled in a time window), debounce (wait until it pause a bit), buffer (base on time) or window (by time or count). Or combining buffer and debounce.

Reactive pull

Dynamic push-pull. Push (reactive) when consumer keeps up with producer & switch to pull (interactive) when consumer is slow. Bound all* queues, vertically not horizontally. Backpressure of a given stream not all stream together.
Cold stream support pull by definition as we can control the speed. Hot receives signal to support pull.
stream.onBackpress(buffer|drop|sample|scaleHorizontally).observeOn(aScheduler)

So on backpressure we can use the backpressure signal to choose different strategy. We can buffer on memory, on Kafka or on disk. We can drop or sample the stream or to scale it horizontally (Netflix can stand a new server on Mesos).