As in GO through the multithreading process the data from the database?

Hello, comrades! Please help to understand.

In Golang application is single threaded application. The application with the library GORM from the database (PostgreSQL) gets the list of all products. Next, a loop runs through each and, depending on the values of the column status updates the product value of the column blocked.

var Tracker = func() {
 var products []models.Products

 // Get products list from the database
 if err := db.Find(&products).Error; err != nil {
log.Fatal(err)
return
}

 // Parsim a list of products
 for i := 0; i < len(products); i++ {
 product := models.Product{}

 // Depending on status of the product to perform different actions.
 if products[i].Status == 1 {
 if err := db.Model(&product).Update("blocked", false).Error; err != nil {
log.Println(err)
return
}
 } else if products[i].Status == 2 {
 if err := db.Model(&product).Update("blocked", true).Error; err != nil {
log.Println(err)
return
}
}
}
}


This single-threaded logic are trying to move to multithreading. As far as my following code is correct in your opinion and what bottlenecks are there? How else can you optimize the speed?

var Tracker = func() {
 var products []models.Products

 // Get products list from the database
 if err := db.Find(&products).Error; err != nil {
log.Fatal(err)
return
}

 for _, product := range products {
 go Checker(product)
}
}

func Checker(product models.Product) {
 if product.Status == 1 {
 if err := db.Model(&product).Update("blocked", false).Error; err != nil {
log.Println(err)
return
}
 } else if product.Status == 2 {
 if err := db.Model(&product).Update("blocked", true).Error; err != nil {
log.Println(err)
return
}
}
}
March 19th 20 at 09:07
2 answers
March 19th 20 at 09:09
Solution
In my opinion it is only one DB query: UPDATE products SET blocked = status == 2 WHERE status in (1,2)
Hello, Sergey! Thank you for your response. In fact, here only a fictional example, as I understand unfortunate, and I apologize. Here I wanted to find out bottlenecks, and pitfalls are in terms of multithreading. It turns out this is the code I'm using at the moment. Not sure it can be one SQL query to do it, so it checks the entry of the current time period (startPeriod, endPeriod).

***
currentTime := time.Now().In(timezone).UTC().Add(6 * time.Hour)

for _, product := range products {
 go Checker(product, currentTime)
}
***

func Checker(product models.Product, currentTime time.Time) {
 startPeriod := product.StartPeriod.UTC()
 endPeriod := product.EndPeriod.UTC()

 if product.Blocked == false {
 if startPeriod.Before(currentTime) && endPeriod.After(currentTime) {
 if err := database.DBGORM.Find(&product).Error; err != nil {
log.Println(err)
return
}
 if err := database.DBGORM.Model(&product).Update("blocked", true).Error; err != nil {
log.Println(err)
return
}
}
 } else if product.Blocked == true {
 if endPeriod.Before(currentTime) {
 if err := database.DBGORM.Find(&product).Error; err != nil {
log.Println(err)
return
}
 if err := database.DBGORM.Model(&product).Update("condition", 3).Error; err != nil {
log.Println(err)
return
}
}
}
}


At the moment, I protest 100 records the code and everything worked without any problems when tests on 200 records appeared warning-and supposedly pq: sorry, too many clients already. Thus in the database all changed, a little bit late. In theory how many goroutines you can create? It depends on the machine, am I right? - Orlando commented on March 19th 20 at 09:12
It depends on the machine, am I right?

Here is more from Postgres depends. In this plus-minus creates a separate process for each connection, so it is very sensitive to the total number of connected clients. You should start with creating a dozen workerb, each of which will receive one item from a channel with a capacity of 10 and process it. Examples of such processing are in the books.
However, this konkurentnoy treatment you can get didlake, plus it won't be very efficient with s. the performance of the database. Try to rewrite the processing in one SQL query, the above code should not be much difficulty. The current time can be passed as a query parameter. - Hazle.Howe commented on March 19th 20 at 09:15
@Hazle.Howe, Sergei, thanks for the advice! If you honestly do not understand how the current code under a single request for a rewrite. Can you please suggest?

Do I understand correctly that workeri and Canali you need to create the following:
var Tracker = func() {
 // Initialize the array.
 var products [] models.Product

 // Get products list from the database.
 if err := db.Find(&products).Error; err != nil {
log.Fatal(err)
return
}

 // Create a channel.
 channel := make(chan models.Product, len(products))

 // Start 5 workarou.
 for w := 1; w <= 5; w++ {
 go worker(w, channel)
}

 // Set the capacity of the channel.
 for j := 1; j <= 10; j++ {
 channel <- j
}
close(channel)
}

func Worker(id int, channel <- chan models.Product) {
 for product := range channel {
 // Rest of code
}
}
- Orlando commented on March 19th 20 at 09:18
BEGIN;
UPDATE product SET condition=3 WHERE blocked AND %(now) > endPeriod;
UPDATE product SET blocked=true WHERE NOT blocked AND %(now) BETWEEN startPeriod AND endPeriod;
COMMIT;

If I understand correctly, something like that.

About the Go-code authority is not prompt, here I'm new. - Hazle.Howe commented on March 19th 20 at 09:21
@Hazle.Howe, Sergey thanks! I based on our last review established procedure. It looks as follows.
CREATE PROCEDURE tracker(CUSTOM_TIME VARCHAR) AS $FUNCTION$
BEGIN
 UPDATE SURVEYS SET CONDITION = 3 WHERE BLOCKED AND CONDITION = 2 AND CUSTOM_TIME > END_PERIOD;
 UPDATE SURVEYS SET BLOCKED = TRUE WHERE NOT BLOCKED AND CONDITION = 2 AND CUSTOM_TIME BETWEEN START_PERIOD AND END_PERIOD;
END;
$FUNCTION$ LANGUAGE plpgsql;


Procedure call:
CALL tracker('2019-03-29 16:37:00');

When you try to call this procedure, you receive the error:
SQL Error [42883]: ERROR: operator does not exist: character varying > timestamp without time zone
Hint: No operator matches the given name and argument types. You might need to add explicit type casts.
Where: PL/pgSQL function tracker(character varying) line 3 at SQL statement


Swears by checking the time in the query. What I do in the query is not correct? - Orlando commented on March 19th 20 at 09:24
@Orlando, so varchar and timestamp different types. you either bring, or procedure peredayte timestamp - Queenie.Rau25 commented on March 19th 20 at 09:27
@Queenie.Rau25, you figure it out. Thank you! :) - Orlando commented on March 19th 20 at 09:30
March 19th 20 at 09:11
Solution
if you move away from what is actually being done by one query (see example above), then there is something to improve.

first, I'm not familiar with this ORM, but I think there is clearly not enough transactions.
second, if records will be hiliard, then it will be too much. I would do n workerb, and the task they would throw through the channel.
Hello Leonid! Thank you for your response. In fact, here only a fictional example, as I understand unfortunate, and I apologize. Here I wanted to know bottlenecks, pitfalls that exist in terms of multithreading. It turns out this is the code I'm using at the moment. Not sure it can be one SQL query to do it, so it checks the entry of the current time period (startPeriod, endPeriod).

***
currentTime := time.Now().In(timezone).UTC().Add(6 * time.Hour)

for _, product := range products {
 go Checker(product, currentTime)
}
***

func Checker(product models.Product, currentTime time.Time) {
 startPeriod := product.StartPeriod.UTC()
 endPeriod := product.EndPeriod.UTC()

 if product.Blocked == false {
 if startPeriod.Before(currentTime) && endPeriod.After(currentTime) {
 if err := database.DBGORM.Find(&product).Error; err != nil {
log.Println(err)
return
}
 if err := database.DBGORM.Model(&product).Update("blocked", true).Error; err != nil {
log.Println(err)
return
}
}
 } else if product.Blocked == true {
 if endPeriod.Before(currentTime) {
 if err := database.DBGORM.Find(&product).Error; err != nil {
log.Println(err)
return
}
 if err := database.DBGORM.Model(&product).Update("condition", 3).Error; err != nil {
log.Println(err)
return
}
}
}
}


At the moment, I protest this code 100 and all worked without problems when tests on 200 records appeared warning-and supposedly pq: sorry, too many clients already. This is not the case for most of those transactions of which you speak? Thus in the basis that all changed, though not all at once but somehow in parts, some with a delay of a second. Anomalies did not notice the other. In theory how many goroutines you can create? It depends on the machine, am I right?

Could you please give an example with markerami that the conversation was substantive? - Orlando commented on March 19th 20 at 09:14
https://gobyexample.com/worker-pools - here is a classic example.
in the transmit channels. Inty, you need to pass the tx (I think that's important) and request parameters. in the channel that processes the results it is possible to send ok bool

well, it would be good to have a channel to complete sarkerov (most likely of type chan struct{}), which can be closed, if the results came back false. In this case, do rollback and close the channel chan struct{}.

I think that the direction about okay? - Queenie.Rau25 commented on March 19th 20 at 09:17
@Queenie.Rau25, Leonid just on the basis of this survey wrote the following code. Can you please take a look at him and say what problems do you see? Honestly don't know how the current code to change to a single request. What can you recommend? I understand correctly, if for example I make 200 requests to update essentially run 200 connectino? Therefore, there were warning pq: sorry, too many clients already. It is suggested that the table you are working with now will always be 200-300 records. On the side of BD has increased the number of connectinos 500. Now, with 200 requests to update warning s not there.

var Tracker = func() {
 // Initialize the array.
 var products [] models.Product

 // Get products list from the database.
 if err := db.Find(&products).Error; err != nil {
log.Fatal(err)
return
}

 // Create a channel.
 channel := make(chan models.Product, len(products))

 // Start 5 workarou.
 for w := 1; w <= 5; w++ {
 go Worker(worker channel, currentTime)
}

 // Set the capacity of the channel.
 for j := 1; j <= len(products); j++ {
 channel <- surveys[i]
}
close(channel)
}

func Worker(workerID int, channel <- chan models.BetaSurvey, currentTime time.Time) {
 fmt.Println("worker", workerID, "started job")

 for product := range channel {
 startPeriod := product.StartPeriod.UTC()
 endPeriod := product.EndPeriod.UTC()

 if product.Blocked == false {
 if startPeriod.Before(currentTime) && endPeriod.After(currentTime) {
 if err := database.DBGORM.Find(&product).Error; err != nil {
log.Println(err)
return
}
 if err := database.DBGORM.Model(&product).Update("blocked", true).Error; err != nil {
log.Println(err)
return
}
}
 } else if product.Blocked == true {
 if endPeriod.Before(currentTime) {
 if err := database.DBGORM.Find(&product).Error; err != nil {
log.Println(err)
return
}
 if err := database.DBGORM.Model(&product).Update("condition", 3).Error; err != nil {
log.Println(err)
return
}
}
}
}

 fmt.Println("worker", workerID, "finished job")
}
- Orlando commented on March 19th 20 at 09:20
@Orlando, I would do a cycle of just for, inside a select statement that would catch the two channels. One as now, half on completion. Like this.

func (o Obj) Worker(string lp) {
 for {
 select {
 case <-o.doneChan:
 return nil
 case m := <-o.inChan:
 if e := o.do(logPrefix, m); e != nil {
 o.errChan <- e
}
}
 }


In addition I suggest to make a channel (o.errChan), listening to which you can catch the error and close doneChan to roll back the changes

To increase the number of connections is not very correct, IMHO.

And, in my opinion, need transaction. Otherwise there is a chance to change only a part of the records.

Well, in General, 200 to 300 records there is no sense in all this. There synchronously will be faster. - Queenie.Rau25 commented on March 19th 20 at 09:23
@Queenie.Rau25, Leonid, to increase amounts of connections is not a good idea. I agree. Excuse me, but your function to trap the error where is inserted? Confused a little. This script I run every minute in CRON. Therefore, the need to update records depending on user activity. - Orlando commented on March 19th 20 at 09:26
@Orlando, I think it will be a separate routine can be run somewhere close to workername.

it is rather strange to run th program for the crown :-) Yes, multithreading. why not use the time.Ticker? - Queenie.Rau25 commented on March 19th 20 at 09:29
@Queenie.Rau25, it's not quite CRON in its purest form. Using a library github.com/mileusna/crontab which starts the timer script. They looked under the hood of this library and there is used this time.Ticker you mentioned. Would you clarify in your function than is o. Obj and lp string specified in an argument?! - Orlando commented on March 19th 20 at 09:32
@Orlandoin about put, all you need not to pass through the parameters. in this case it is not necessary, because we only have db/tx. lp - these are my troubles, not just removed. - Queenie.Rau25 commented on March 19th 20 at 09:35

Find more questions by tags Go